/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.coprocessor;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import static org.apache.phoenix.util.SchemaUtil.getVarChars;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.ipc.RpcUtil;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.GlobalCache.FunctionBytesPtr;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetSchemaRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.parse.LiteralParseNode;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PFunction.FunctionArgument;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaDataEntity;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.EncodedCQCounter;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.LinkType;
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

/**
 *
 * Endpoint co-processor through which all Phoenix metadata mutations flow.
 * We only allow mutations to the latest version of a Phoenix table (i.e. the
 * timeStamp must be increasing).
 * For adding/dropping columns use a sequence number on the table to ensure that
 * the client has the latest version.
 * The timeStamp on the table correlates with the timeStamp on the data row.
 * TODO: we should enforce that a metadata mutation uses a timeStamp bigger than
 * any in use on the data table, b/c otherwise we can end up with data rows that
 * are not valid against a schema row.
 *
 *
 * @since 0.1
 */
@SuppressWarnings("deprecation")
public class MetaDataEndpointImpl extends MetaDataProtocol implements CoprocessorService, Coprocessor {
    private static final Logger logger = LoggerFactory.getLogger(MetaDataEndpointImpl.class);

    // Column to track tables that have been upgraded based on PHOENIX-2067
    public static final String ROW_KEY_ORDER_OPTIMIZABLE = "ROW_KEY_ORDER_OPTIMIZABLE";
    public static final byte[] ROW_KEY_ORDER_OPTIMIZABLE_BYTES = Bytes.toBytes(ROW_KEY_ORDER_OPTIMIZABLE);

    private static final byte[] CHILD_TABLE_BYTES = new byte[] {PTable.LinkType.CHILD_TABLE.getSerializedValue()};
    private static final byte[] PHYSICAL_TABLE_BYTES =
            new byte[] { PTable.LinkType.PHYSICAL_TABLE.getSerializedValue() };

    // KeyValues for Table
    private static final KeyValue TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
    private static final KeyValue TABLE_SEQ_NUM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
    private static final KeyValue COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES);
    private static final KeyValue SALT_BUCKETS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES);
    private static final KeyValue PK_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES);
    private static final KeyValue DATA_TABLE_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
    private static final KeyValue INDEX_STATE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
    private static final KeyValue IMMUTABLE_ROWS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES);
    private static final KeyValue VIEW_EXPRESSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES);
    private static final KeyValue DEFAULT_COLUMN_FAMILY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES);
    private static final KeyValue DISABLE_WAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES);
    private static final KeyValue MULTI_TENANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
    private static final KeyValue VIEW_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
    private static final KeyValue VIEW_INDEX_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
    private static final KeyValue INDEX_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES);
    private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
    private static final KeyValue STORE_NULLS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES);
    private static final KeyValue EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
    private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
    private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
    private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
    private static final KeyValue TRANSACTION_PROVIDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTION_PROVIDER_BYTES);
    private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
    private static final KeyValue IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
            TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
    private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
    private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
    private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
    private static final KeyValue ENCODING_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODING_SCHEME_BYTES);
    private static final KeyValue USE_STATS_FOR_PARALLELIZATION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, USE_STATS_FOR_PARALLELIZATION_BYTES);
    
    private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
            EMPTY_KEYVALUE_KV,
            TABLE_TYPE_KV,
            TABLE_SEQ_NUM_KV,
            COLUMN_COUNT_KV,
            SALT_BUCKETS_KV,
            PK_NAME_KV,
            DATA_TABLE_NAME_KV,
            INDEX_STATE_KV,
            IMMUTABLE_ROWS_KV,
            VIEW_EXPRESSION_KV,
            DEFAULT_COLUMN_FAMILY_KV,
            DISABLE_WAL_KV,
            MULTI_TENANT_KV,
            VIEW_TYPE_KV,
            VIEW_INDEX_ID_KV,
            INDEX_TYPE_KV,
            INDEX_DISABLE_TIMESTAMP_KV,
            STORE_NULLS_KV,
            BASE_COLUMN_COUNT_KV,
            ROW_KEY_ORDER_OPTIMIZABLE_KV,
            TRANSACTIONAL_KV,
            TRANSACTION_PROVIDER_KV,
            UPDATE_CACHE_FREQUENCY_KV,
            IS_NAMESPACE_MAPPED_KV,
            AUTO_PARTITION_SEQ_KV,
            APPEND_ONLY_SCHEMA_KV,
            STORAGE_SCHEME_KV,
            ENCODING_SCHEME_KV,
            USE_STATS_FOR_PARALLELIZATION_KV
            );
    static {
        Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
    }

    private static final int TABLE_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_TYPE_KV);
    private static final int TABLE_SEQ_NUM_INDEX = TABLE_KV_COLUMNS.indexOf(TABLE_SEQ_NUM_KV);
    private static final int COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(COLUMN_COUNT_KV);
    private static final int SALT_BUCKETS_INDEX = TABLE_KV_COLUMNS.indexOf(SALT_BUCKETS_KV);
    private static final int PK_NAME_INDEX = TABLE_KV_COLUMNS.indexOf(PK_NAME_KV);
    private static final int DATA_TABLE_NAME_INDEX = TABLE_KV_COLUMNS.indexOf(DATA_TABLE_NAME_KV);
    private static final int INDEX_STATE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_STATE_KV);
    private static final int IMMUTABLE_ROWS_INDEX = TABLE_KV_COLUMNS.indexOf(IMMUTABLE_ROWS_KV);
    private static final int VIEW_STATEMENT_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_EXPRESSION_KV);
    private static final int DEFAULT_COLUMN_FAMILY_INDEX = TABLE_KV_COLUMNS.indexOf(DEFAULT_COLUMN_FAMILY_KV);
    private static final int DISABLE_WAL_INDEX = TABLE_KV_COLUMNS.indexOf(DISABLE_WAL_KV);
    private static final int MULTI_TENANT_INDEX = TABLE_KV_COLUMNS.indexOf(MULTI_TENANT_KV);
    private static final int VIEW_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_TYPE_KV);
    private static final int VIEW_INDEX_ID_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_INDEX_ID_KV);
    private static final int INDEX_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_TYPE_KV);
    private static final int STORE_NULLS_INDEX = TABLE_KV_COLUMNS.indexOf(STORE_NULLS_KV);
    private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
    private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
    private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
    private static final int TRANSACTION_PROVIDER_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTION_PROVIDER_KV);
    private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
    private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
    private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
    private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV);
    private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV);
    private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
    private static final int QUALIFIER_ENCODING_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(ENCODING_SCHEME_KV);
    private static final int USE_STATS_FOR_PARALLELIZATION_INDEX = TABLE_KV_COLUMNS.indexOf(USE_STATS_FOR_PARALLELIZATION_KV);

    // KeyValues for Column
    private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
    private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
    private static final KeyValue NULLABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NULLABLE_BYTES);
    private static final KeyValue DATA_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
    private static final KeyValue ORDINAL_POSITION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES);
    private static final KeyValue SORT_ORDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SORT_ORDER_BYTES);
    private static final KeyValue ARRAY_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ARRAY_SIZE_BYTES);
    private static final KeyValue VIEW_CONSTANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_CONSTANT_BYTES);
    private static final KeyValue IS_VIEW_REFERENCED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
    private static final KeyValue COLUMN_DEF_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
    private static final KeyValue IS_ROW_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES);
    private static final KeyValue COLUMN_QUALIFIER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES);

    private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
            DECIMAL_DIGITS_KV,
            COLUMN_SIZE_KV,
            NULLABLE_KV,
            DATA_TYPE_KV,
            ORDINAL_POSITION_KV,
            SORT_ORDER_KV,
            DATA_TABLE_NAME_KV, // included in both column and table row for metadata APIs
            ARRAY_SIZE_KV,
            VIEW_CONSTANT_KV,
            IS_VIEW_REFERENCED_KV,
            COLUMN_DEF_KV,
            IS_ROW_TIMESTAMP_KV,
            COLUMN_QUALIFIER_KV
            );
    static {
        Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
    }
    private static final KeyValue QUALIFIER_COUNTER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES);
    private static final int DECIMAL_DIGITS_INDEX = COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV);
    private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
    private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
    private static final int DATA_TYPE_INDEX = COLUMN_KV_COLUMNS.indexOf(DATA_TYPE_KV);
    private static final int ORDINAL_POSITION_INDEX = COLUMN_KV_COLUMNS.indexOf(ORDINAL_POSITION_KV);
    private static final int SORT_ORDER_INDEX = COLUMN_KV_COLUMNS.indexOf(SORT_ORDER_KV);
    private static final int ARRAY_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(ARRAY_SIZE_KV);
    private static final int VIEW_CONSTANT_INDEX = COLUMN_KV_COLUMNS.indexOf(VIEW_CONSTANT_KV);
    private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
    private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
    private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
    private static final int COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV);
    
    private static final int LINK_TYPE_INDEX = 0;

    private static final KeyValue CLASS_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES);
    private static final KeyValue JAR_PATH_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES);
    private static final KeyValue RETURN_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES);
    private static final KeyValue NUM_ARGS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NUM_ARGS_BYTES);
    private static final KeyValue TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TYPE_BYTES);
    private static final KeyValue IS_CONSTANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_CONSTANT_BYTES);
    private static final KeyValue DEFAULT_VALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_VALUE_BYTES);
    private static final KeyValue MIN_VALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MIN_VALUE_BYTES);
    private static final KeyValue MAX_VALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_VALUE_BYTES);
    private static final KeyValue IS_ARRAY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ARRAY_BYTES);

    private static final List<KeyValue> FUNCTION_KV_COLUMNS = Arrays.<KeyValue>asList(
        EMPTY_KEYVALUE_KV,
        CLASS_NAME_KV,
        JAR_PATH_KV,
        RETURN_TYPE_KV,
        NUM_ARGS_KV
        );
    static {
        Collections.sort(FUNCTION_KV_COLUMNS, KeyValue.COMPARATOR);
    }
    
    private static final int CLASS_NAME_INDEX = FUNCTION_KV_COLUMNS.indexOf(CLASS_NAME_KV);
    private static final int JAR_PATH_INDEX = FUNCTION_KV_COLUMNS.indexOf(JAR_PATH_KV);
    private static final int RETURN_TYPE_INDEX = FUNCTION_KV_COLUMNS.indexOf(RETURN_TYPE_KV);
    private static final int NUM_ARGS_INDEX = FUNCTION_KV_COLUMNS.indexOf(NUM_ARGS_KV);

    private static final List<KeyValue> FUNCTION_ARG_KV_COLUMNS = Arrays.<KeyValue>asList(
        TYPE_KV,
        IS_ARRAY_KV,
        IS_CONSTANT_KV,
        DEFAULT_VALUE_KV,
        MIN_VALUE_KV,
        MAX_VALUE_KV
        );
    static {
        Collections.sort(FUNCTION_ARG_KV_COLUMNS, KeyValue.COMPARATOR);
    }
    
    private static final int IS_ARRAY_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_ARRAY_KV);
    private static final int IS_CONSTANT_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(IS_CONSTANT_KV);
    private static final int DEFAULT_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(DEFAULT_VALUE_KV);
    private static final int MIN_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MIN_VALUE_KV);
    private static final int MAX_VALUE_INDEX = FUNCTION_ARG_KV_COLUMNS.indexOf(MAX_VALUE_KV);

    private static PName newPName(byte[] keyBuffer, int keyOffset, int keyLength) {
        if (keyLength <= 0) {
            return null;
        }
        int length = getVarCharLength(keyBuffer, keyOffset, keyLength);
        return PNameFactory.newName(keyBuffer, keyOffset, length);
    }

    private RegionCoprocessorEnvironment env;

    private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost;
    private boolean accessCheckEnabled;
    private boolean blockWriteRebuildIndex;
    private int maxIndexesPerTable;
    private boolean isTablesMappingEnabled;


    /**
     * Stores a reference to the coprocessor environment provided by the
     * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
     * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
     * on a table region, so always expects this to be an instance of
     * {@link RegionCoprocessorEnvironment}.
     * @param env the environment provided by the coprocessor host
     * @throws IOException if the provided environment is not an instance of
     *             {@code RegionCoprocessorEnvironment}
     */
    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment) env;
        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }
        
        phoenixAccessCoprocessorHost = new PhoenixMetaDataCoprocessorHost(this.env);
        Configuration config = env.getConfiguration();
        this.accessCheckEnabled = config.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
                QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
        this.blockWriteRebuildIndex  = config.getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
                QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
        this.maxIndexesPerTable = config.getInt(QueryServices.MAX_INDEXES_PER_TABLE,
                    QueryServicesOptions.DEFAULT_MAX_INDEXES_PER_TABLE);
        this.isTablesMappingEnabled = SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE,
                new ReadOnlyProps(config.iterator()));

        logger.info("Starting Tracing-Metrics Systems");
        // Start the phoenix trace collection
        Tracing.addTraceMetricsSource();
        Metrics.ensureConfigured();
    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        // nothing to do
    }

    @Override
    public Service getService() {
        return this;
    }

    @Override
    public void getTable(RpcController controller, GetTableRequest request,
            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        byte[] tenantId = request.getTenantId().toByteArray();
        byte[] schemaName = request.getSchemaName().toByteArray();
        byte[] tableName = request.getTableName().toByteArray();
        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
        long tableTimeStamp = request.getTableTimestamp();
        try {
            // TODO: check that key is within region.getStartKey() and region.getEndKey()
            // and return special code to force client to lookup region from meta.
            Region region = env.getRegion();
            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
                return;
            }

            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
            PTable table = doGetTable(key, request.getClientTimestamp(), request.getClientVersion());
            if (table == null) {
                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                builder.setMutationTime(currentTime);
                done.run(builder.build());
                return;
            }
            getCoprocessorHost().preGetTable(Bytes.toString(tenantId), SchemaUtil.getTableName(schemaName, tableName),
                    TableName.valueOf(table.getPhysicalName().getBytes()));

            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
            builder.setMutationTime(currentTime);
            if (blockWriteRebuildIndex) {
                long disableIndexTimestamp = table.getIndexDisableTimestamp();
                long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
                for (PTable index : table.getIndexes()) {
                    disableIndexTimestamp = index.getIndexDisableTimestamp();
                    if (disableIndexTimestamp > 0
                            && (index.getIndexState() == PIndexState.ACTIVE
                                    || index.getIndexState() == PIndexState.PENDING_ACTIVE
                                    || index.getIndexState() == PIndexState.PENDING_DISABLE)
                            && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
                        minNonZerodisableIndexTimestamp = disableIndexTimestamp;
                    }
                }
                // Freeze time for table at min non-zero value of INDEX_DISABLE_TIMESTAMP
                // This will keep the table consistent with index as the table has had one more
                // batch applied to it.
                if (minNonZerodisableIndexTimestamp != Long.MAX_VALUE) {
                    // Subtract one because we add one due to timestamp granularity in Windows
                    builder.setMutationTime(minNonZerodisableIndexTimestamp - 1);
                }
            }

            if (table.getTimeStamp() != tableTimeStamp) {
                builder.setTable(PTableImpl.toProto(table));
            }
            done.run(builder.build());
            return;
        } catch (Throwable t) {
            logger.error("getTable failed", t);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
        }
    }

    private PhoenixMetaDataCoprocessorHost getCoprocessorHost() {
        return phoenixAccessCoprocessorHost;
    }

    private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
            long clientTimeStamp, int clientVersion) throws IOException, SQLException {
        Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
        try (RegionScanner scanner = region.getScanner(scan)) {
            PTable oldTable = (PTable)metaDataCache.getIfPresent(cacheKey);
            long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp();
            PTable newTable;
            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp, clientVersion);
            if (newTable == null) {
                return null;
            }
            if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()
                    || (blockWriteRebuildIndex && newTable.getIndexDisableTimestamp() > 0)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Caching table "
                            + Bytes.toStringBinary(cacheKey.get(), cacheKey.getOffset(),
                                cacheKey.getLength()) + " at seqNum "
                            + newTable.getSequenceNumber() + " with newer timestamp "
                            + newTable.getTimeStamp() + " versus " + tableTimeStamp);
                }
                metaDataCache.put(cacheKey, newTable);
            }
            return newTable;
        }
    }

    private List<PFunction> buildFunctions(List<byte[]> keys, Region region,
            long clientTimeStamp, boolean isReplace, List<Mutation> deleteMutationsForReplace) throws IOException, SQLException {
        List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
        for (byte[] key : keys) {
            byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
            ByteUtil.nextKey(stopKey, stopKey.length);
            keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false));
        }
        Scan scan = new Scan();
        scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp);
        ScanRanges scanRanges = ScanRanges.createPointLookup(keyRanges);
        scanRanges.initializeScan(scan);
        scan.setFilter(scanRanges.getSkipScanFilter());
        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
        List<PFunction> functions = new ArrayList<PFunction>();
        PFunction function = null;
        try (RegionScanner scanner = region.getScanner(scan)) {
            for(int i = 0; i< keys.size(); i++) {
                function = null;
                function =
                        getFunction(scanner, isReplace, clientTimeStamp, deleteMutationsForReplace);
                if (function == null) {
                    return null;
                }
                byte[] functionKey =
                        SchemaUtil.getFunctionKey(
                            function.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : function
                                    .getTenantId().getBytes(), Bytes.toBytes(function
                                    .getFunctionName()));
                metaDataCache.put(new FunctionBytesPtr(functionKey), function);
                functions.add(function);
            }
            return functions;
        }
    }

    private List<PSchema> buildSchemas(List<byte[]> keys, Region region, long clientTimeStamp,
            ImmutableBytesPtr cacheKey) throws IOException, SQLException {
        List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size());
        for (byte[] key : keys) {
            byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY);
            ByteUtil.nextKey(stopKey, stopKey.length);
            keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false));
        }
        Scan scan = new Scan();
        scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp);
        ScanRanges scanRanges = ScanRanges.createPointLookup(keyRanges);
        scanRanges.initializeScan(scan);
        scan.setFilter(scanRanges.getSkipScanFilter());
        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
        List<PSchema> schemas = new ArrayList<PSchema>();
        PSchema schema = null;
        try (RegionScanner scanner = region.getScanner(scan)) {
            for (int i = 0; i < keys.size(); i++) {
                schema = null;
                schema = getSchema(scanner, clientTimeStamp);
                if (schema == null) { return null; }
                metaDataCache.put(cacheKey, schema);
                schemas.add(schema);
            }
            return schemas;
        }
    }

    private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes, int clientVersion) throws IOException, SQLException {
        byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes());
        PTable indexTable = doGetTable(key, clientTimeStamp, clientVersion);
        if (indexTable == null) {
            ServerUtil.throwIOException("Index not found", new TableNotFoundException(schemaName.getString(), indexName.getString()));
            return;
        }
        indexes.add(indexTable);
    }

    private void addColumnToTable(List<Cell> results, PName colName, PName famName,
        Cell[] colKeyValues, List<PColumn> columns, boolean isSalted) {
        int i = 0;
        int j = 0;
        while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
            Cell kv = results.get(i);
            Cell searchKv = COLUMN_KV_COLUMNS.get(j);
            int cmp =
                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
                        kv.getQualifierLength(), searchKv.getQualifierArray(),
                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
            if (cmp == 0) {
                colKeyValues[j++] = kv;
                i++;
            } else if (cmp > 0) {
                colKeyValues[j++] = null;
            } else {
                i++; // shouldn't happen - means unexpected KV in system table column row
            }
        }

        if (colKeyValues[DATA_TYPE_INDEX] == null || colKeyValues[NULLABLE_INDEX] == null
                || colKeyValues[ORDINAL_POSITION_INDEX] == null) {
            throw new IllegalStateException("Didn't find all required key values in '"
                    + colName.getString() + "' column metadata row");
        }

        Cell columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
        Integer maxLength =
                columnSizeKv == null ? null : PInteger.INSTANCE.getCodec().decodeInt(
                    columnSizeKv.getValueArray(), columnSizeKv.getValueOffset(), SortOrder.getDefault());
        Cell decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
        Integer scale =
                decimalDigitKv == null ? null : PInteger.INSTANCE.getCodec().decodeInt(
                    decimalDigitKv.getValueArray(), decimalDigitKv.getValueOffset(), SortOrder.getDefault());
        Cell ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
        int position =
            PInteger.INSTANCE.getCodec().decodeInt(ordinalPositionKv.getValueArray(),
                    ordinalPositionKv.getValueOffset(), SortOrder.getDefault()) + (isSalted ? 1 : 0);
        Cell nullableKv = colKeyValues[NULLABLE_INDEX];
        boolean isNullable =
            PInteger.INSTANCE.getCodec().decodeInt(nullableKv.getValueArray(),
                    nullableKv.getValueOffset(), SortOrder.getDefault()) != ResultSetMetaData.columnNoNulls;
        Cell dataTypeKv = colKeyValues[DATA_TYPE_INDEX];
        PDataType dataType =
                PDataType.fromTypeId(PInteger.INSTANCE.getCodec().decodeInt(
                  dataTypeKv.getValueArray(), dataTypeKv.getValueOffset(), SortOrder.getDefault()));
        if (maxLength == null && dataType == PBinary.INSTANCE) dataType = PVarbinary.INSTANCE;   // For
                                                                                               // backward
                                                                                               // compatibility.
        Cell sortOrderKv = colKeyValues[SORT_ORDER_INDEX];
        SortOrder sortOrder =
                sortOrderKv == null ? SortOrder.getDefault() : SortOrder.fromSystemValue(PInteger.INSTANCE
                        .getCodec().decodeInt(sortOrderKv.getValueArray(),
                                sortOrderKv.getValueOffset(), SortOrder.getDefault()));

        Cell arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
        Integer arraySize = arraySizeKv == null ? null :
            PInteger.INSTANCE.getCodec().decodeInt(arraySizeKv.getValueArray(), arraySizeKv.getValueOffset(), SortOrder.getDefault());

        Cell viewConstantKv = colKeyValues[VIEW_CONSTANT_INDEX];
        byte[] viewConstant = viewConstantKv == null ? null : viewConstantKv.getValue();
        Cell isViewReferencedKv = colKeyValues[IS_VIEW_REFERENCED_INDEX];
        boolean isViewReferenced = isViewReferencedKv != null && Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isViewReferencedKv.getValueArray(), isViewReferencedKv.getValueOffset(), isViewReferencedKv.getValueLength()));
        Cell columnDefKv = colKeyValues[COLUMN_DEF_INDEX];
        String expressionStr = columnDefKv==null ? null : (String)PVarchar.INSTANCE.toObject(columnDefKv.getValueArray(), columnDefKv.getValueOffset(), columnDefKv.getValueLength());
        Cell isRowTimestampKV = colKeyValues[IS_ROW_TIMESTAMP_INDEX];
        boolean isRowTimestamp =
                isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                        isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
                        isRowTimestampKV.getValueLength()));
        
        boolean isPkColumn = famName == null || famName.getString() == null;
        Cell columnQualifierKV = colKeyValues[COLUMN_QUALIFIER_INDEX];
        // Older tables won't have column qualifier metadata present. To make things simpler, just set the
        // column qualifier bytes by using the column name.
        byte[] columnQualifierBytes = columnQualifierKV != null ? 
                Arrays.copyOfRange(columnQualifierKV.getValueArray(),
                    columnQualifierKV.getValueOffset(), columnQualifierKV.getValueOffset()
                            + columnQualifierKV.getValueLength()) : (isPkColumn ? null : colName.getBytes());
        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes);
        columns.add(column);
    }
    
    private void addArgumentToFunction(List<Cell> results, PName functionName, PName type,
        Cell[] functionKeyValues, List<FunctionArgument> arguments, short argPosition) throws SQLException {
        int i = 0;
        int j = 0;
        while (i < results.size() && j < FUNCTION_ARG_KV_COLUMNS.size()) {
            Cell kv = results.get(i);
            Cell searchKv = FUNCTION_ARG_KV_COLUMNS.get(j);
            int cmp =
                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
                        kv.getQualifierLength(), searchKv.getQualifierArray(),
                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
            if (cmp == 0) {
                functionKeyValues[j++] = kv;
                i++;
            } else if (cmp > 0) {
                functionKeyValues[j++] = null;
            } else {
                i++; // shouldn't happen - means unexpected KV in system table column row
            }
        }

        Cell isArrayKv = functionKeyValues[IS_ARRAY_INDEX];
        boolean isArrayType =
                isArrayKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                    isArrayKv.getValueArray(), isArrayKv.getValueOffset(),
                    isArrayKv.getValueLength()));
        Cell isConstantKv = functionKeyValues[IS_CONSTANT_INDEX];
        boolean isConstant =
                isConstantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                    isConstantKv.getValueArray(), isConstantKv.getValueOffset(),
                    isConstantKv.getValueLength()));
        Cell defaultValueKv = functionKeyValues[DEFAULT_VALUE_INDEX];
        String defaultValue =
                defaultValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
                    defaultValueKv.getValueArray(), defaultValueKv.getValueOffset(),
                    defaultValueKv.getValueLength());
        Cell minValueKv = functionKeyValues[MIN_VALUE_INDEX];
        String minValue =
                minValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
                    minValueKv.getValueArray(), minValueKv.getValueOffset(),
                    minValueKv.getValueLength());
        Cell maxValueKv = functionKeyValues[MAX_VALUE_INDEX];
        String maxValue =
                maxValueKv == null ? null : (String) PVarchar.INSTANCE.toObject(
                    maxValueKv.getValueArray(), maxValueKv.getValueOffset(),
                    maxValueKv.getValueLength());
        FunctionArgument arg =
                new FunctionArgument(type.getString(), isArrayType, isConstant,
                        defaultValue == null ? null : LiteralExpression.newConstant((new LiteralParseNode(defaultValue)).getValue()),
                        minValue == null ? null : LiteralExpression.newConstant((new LiteralParseNode(minValue)).getValue()),
                        maxValue == null ? null : LiteralExpression.newConstant((new LiteralParseNode(maxValue)).getValue()),
                        argPosition);
        arguments.add(arg);
    }

    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp, int clientVersion)
        throws IOException, SQLException {
        List<Cell> results = Lists.newArrayList();
        scanner.next(results);
        if (results.isEmpty()) {
            return null;
        }
        Cell[] tableKeyValues = new Cell[TABLE_KV_COLUMNS.size()];
        Cell[] colKeyValues = new Cell[COLUMN_KV_COLUMNS.size()];

        // Create PTable based on KeyValues from scan
        Cell keyValue = results.get(0);
        byte[] keyBuffer = keyValue.getRowArray();
        int keyLength = keyValue.getRowLength();
        int keyOffset = keyValue.getRowOffset();
        PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
        int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
        if (tenantIdLength == 0) {
            tenantId = null;
        }
        PName schemaName = newPName(keyBuffer, keyOffset+tenantIdLength+1, keyLength);
        int schemaNameLength = schemaName.getBytes().length;
        int tableNameLength = keyLength - schemaNameLength - 1 - tenantIdLength - 1;
        byte[] tableNameBytes = new byte[tableNameLength];
        System.arraycopy(keyBuffer, keyOffset + schemaNameLength + 1 + tenantIdLength + 1,
            tableNameBytes, 0, tableNameLength);
        PName tableName = PNameFactory.newName(tableNameBytes);

        int offset = tenantIdLength + schemaNameLength + tableNameLength + 3;
        // This will prevent the client from continually looking for the current
        // table when we know that there will never be one since we disallow updates
        // unless the table is the latest
        // If we already have a table newer than the one we just found and
        // the client timestamp is less that the existing table time stamp,
        // bump up the timeStamp to right before the client time stamp, since
        // we know it can't possibly change.
        long timeStamp = keyValue.getTimestamp();
        // long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
        // clientTimeStamp < tableTimeStamp
        // ? clientTimeStamp-1
        // : keyValue.getTimestamp();

        int i = 0;
        int j = 0;
        while (i < results.size() && j < TABLE_KV_COLUMNS.size()) {
            Cell kv = results.get(i);
            Cell searchKv = TABLE_KV_COLUMNS.get(j);
            int cmp =
                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
                        kv.getQualifierLength(), searchKv.getQualifierArray(),
                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
            if (cmp == 0) {
                timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
                                                                    // header row
                tableKeyValues[j++] = kv;
                i++;
            } else if (cmp > 0) {
                timeStamp = Math.max(timeStamp, kv.getTimestamp());
                tableKeyValues[j++] = null;
            } else {
                i++; // shouldn't happen - means unexpected KV in system table header row
            }
        }
        // TABLE_TYPE, TABLE_SEQ_NUM and COLUMN_COUNT are required.
        if (tableKeyValues[TABLE_TYPE_INDEX] == null || tableKeyValues[TABLE_SEQ_NUM_INDEX] == null
                || tableKeyValues[COLUMN_COUNT_INDEX] == null) {
            throw new IllegalStateException(
                    "Didn't find expected key values for table row in metadata row");
        }

        Cell tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
        PTableType tableType =
                PTableType
                        .fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
        Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
        long tableSeqNum =
            PLong.INSTANCE.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
                    tableSeqNumKv.getValueOffset(), SortOrder.getDefault());
        Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
        int columnCount =
            PInteger.INSTANCE.getCodec().decodeInt(columnCountKv.getValueArray(),
                    columnCountKv.getValueOffset(), SortOrder.getDefault());
        Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
        PName pkName =
                pkNameKv != null ? newPName(pkNameKv.getValueArray(), pkNameKv.getValueOffset(),
                    pkNameKv.getValueLength()) : null;
        Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
        Integer saltBucketNum =
                saltBucketNumKv != null ? (Integer) PInteger.INSTANCE.getCodec().decodeInt(
                    saltBucketNumKv.getValueArray(), saltBucketNumKv.getValueOffset(), SortOrder.getDefault()) : null;
        if (saltBucketNum != null && saltBucketNum.intValue() == 0) {
            saltBucketNum = null; // Zero salt buckets means not salted
        }
        Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
        PName dataTableName =
                dataTableNameKv != null ? newPName(dataTableNameKv.getValueArray(),
                    dataTableNameKv.getValueOffset(), dataTableNameKv.getValueLength()) : null;
        Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
        PIndexState indexState =
                indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
                        .getValueArray()[indexStateKv.getValueOffset()]);
        // If client is not yet up to 4.12, then translate PENDING_ACTIVE to ACTIVE (as would have been
        // the value in those versions) since the client won't have this index state in its enum.
        if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < MetaDataProtocol.MIN_PENDING_ACTIVE_INDEX) {
            indexState = PIndexState.ACTIVE;
        }
        // If client is not yet up to 4.14, then translate PENDING_DISABLE to DISABLE
        // since the client won't have this index state in its enum.
        if (indexState == PIndexState.PENDING_DISABLE && clientVersion < MetaDataProtocol.MIN_PENDING_DISABLE_INDEX) {
            // note: for older clients, we have to rely on the rebuilder to transition PENDING_DISABLE -> DISABLE
            indexState = PIndexState.DISABLE;
        }
        Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
        boolean isImmutableRows =
                immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject(
                    immutableRowsKv.getValueArray(), immutableRowsKv.getValueOffset(),
                    immutableRowsKv.getValueLength());
        Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
        PName defaultFamilyName = defaultFamilyNameKv != null ? newPName(defaultFamilyNameKv.getValueArray(), defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : null;
        Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
        String viewStatement = viewStatementKv != null ? (String) PVarchar.INSTANCE.toObject(viewStatementKv.getValueArray(), viewStatementKv.getValueOffset(),
                viewStatementKv.getValueLength()) : null;
        Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
        boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL : Boolean.TRUE.equals(
            PBoolean.INSTANCE.toObject(disableWALKv.getValueArray(), disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
        Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
        boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
        Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
        boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
        Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
        Cell transactionProviderKv = tableKeyValues[TRANSACTION_PROVIDER_INDEX];
        TransactionFactory.Provider transactionProvider = null;
        if (transactionProviderKv == null) {
            if (transactionalKv != null && Boolean.TRUE.equals(
                    PBoolean.INSTANCE.toObject(
                            transactionalKv.getValueArray(), 
                            transactionalKv.getValueOffset(), 
                            transactionalKv.getValueLength()))) {
                // For backward compat, prior to client setting TRANSACTION_PROVIDER
                transactionProvider = TransactionFactory.Provider.TEPHRA;
            }
        } else {
            transactionProvider = TransactionFactory.Provider.fromCode(
                    PTinyint.INSTANCE.getCodec().decodeByte(
                            transactionProviderKv.getValueArray(),
                            transactionProviderKv.getValueOffset(), 
                            SortOrder.getDefault()));
        }
        Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
        ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
        Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
        Short viewIndexId = viewIndexIdKv == null ? null : (Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getValueArray(), viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
        Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
        IndexType indexType = indexTypeKv == null ? null : IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
        Cell baseColumnCountKv = tableKeyValues[BASE_COLUMN_COUNT_INDEX];
        int baseColumnCount = baseColumnCountKv == null ? 0 : PInteger.INSTANCE.getCodec().decodeInt(baseColumnCountKv.getValueArray(),
            baseColumnCountKv.getValueOffset(), SortOrder.getDefault());
        Cell rowKeyOrderOptimizableKv = tableKeyValues[ROW_KEY_ORDER_OPTIMIZABLE_INDEX];
        boolean rowKeyOrderOptimizable = rowKeyOrderOptimizableKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(rowKeyOrderOptimizableKv.getValueArray(), rowKeyOrderOptimizableKv.getValueOffset(), rowKeyOrderOptimizableKv.getValueLength()));
        Cell updateCacheFrequencyKv = tableKeyValues[UPDATE_CACHE_FREQUENCY_INDEX];
        long updateCacheFrequency = updateCacheFrequencyKv == null ? 0 :
            PLong.INSTANCE.getCodec().decodeLong(updateCacheFrequencyKv.getValueArray(),
                    updateCacheFrequencyKv.getValueOffset(), SortOrder.getDefault());
        Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP];
        long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(),
                indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault());
        Cell isNamespaceMappedKv = tableKeyValues[IS_NAMESPACE_MAPPED_INDEX];
        boolean isNamespaceMapped = isNamespaceMappedKv == null ? false
                : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(),
                        isNamespaceMappedKv.getValueOffset(), isNamespaceMappedKv.getValueLength()));
        Cell autoPartitionSeqKv = tableKeyValues[AUTO_PARTITION_SEQ_INDEX];
        String autoPartitionSeq = autoPartitionSeqKv != null ? (String) PVarchar.INSTANCE.toObject(autoPartitionSeqKv.getValueArray(), autoPartitionSeqKv.getValueOffset(),
            autoPartitionSeqKv.getValueLength()) : null;
        Cell isAppendOnlySchemaKv = tableKeyValues[APPEND_ONLY_SCHEMA_INDEX];
        boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
                : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
                    isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength()));
        Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
        //TODO: change this once we start having other values for storage schemes
        ImmutableStorageScheme storageScheme = storageSchemeKv == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme
                .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
                        storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
        Cell encodingSchemeKv = tableKeyValues[QUALIFIER_ENCODING_SCHEME_INDEX];
        QualifierEncodingScheme encodingScheme = encodingSchemeKv == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme
                .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(encodingSchemeKv.getValueArray(),
                    encodingSchemeKv.getValueOffset(), encodingSchemeKv.getValueLength()));
        Cell useStatsForParallelizationKv = tableKeyValues[USE_STATS_FOR_PARALLELIZATION_INDEX];
        Boolean useStatsForParallelization = useStatsForParallelizationKv == null ? null : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(useStatsForParallelizationKv.getValueArray(), useStatsForParallelizationKv.getValueOffset(), useStatsForParallelizationKv.getValueLength()));
        
        List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
        List<PTable> indexes = Lists.newArrayList();
        List<PName> physicalTables = Lists.newArrayList();
        PName parentTableName = tableType == INDEX ? dataTableName : null;
        PName parentSchemaName = tableType == INDEX ? schemaName : null;
        EncodedCQCounter cqCounter =
                (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER
                        : new EncodedCQCounter();
        while (true) {
          results.clear();
          scanner.next(results);
          if (results.isEmpty()) {
              break;
          }
          Cell colKv = results.get(LINK_TYPE_INDEX);
          int colKeyLength = colKv.getRowLength();
          PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
          int colKeyOffset = offset + colName.getBytes().length + 1;
          PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
          if (isQualifierCounterKV(colKv)) {
              Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
              cqCounter.setValue(famName.getString(), value);
          } else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength())==0) {    
              LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
              if (linkType == LinkType.INDEX_TABLE) {
                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes, clientVersion);
              } else if (linkType == LinkType.PHYSICAL_TABLE) {
                  physicalTables.add(famName);
              } else if (linkType == LinkType.PARENT_TABLE) {
                  parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
                  parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
              }
          } else {
              addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
          }
        }
        // Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
        // server while holding this lock is a bad idea and likely to cause contention.
        return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum,
                pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
                viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
                rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, baseColumnCount,
                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounter, useStatsForParallelization);
    }
    
    private boolean isQualifierCounterKV(Cell kv) {
        int cmp =
                Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
                    kv.getQualifierLength(), QUALIFIER_COUNTER_KV.getQualifierArray(),
                    QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength());
        return cmp == 0;
    }
    
    private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException {
        List<Cell> results = Lists.newArrayList();
        scanner.next(results);
        if (results.isEmpty()) { return null; }

        Cell keyValue = results.get(0);
        byte[] keyBuffer = keyValue.getRowArray();
        int keyLength = keyValue.getRowLength();
        int keyOffset = keyValue.getRowOffset();
        PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
        int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
        if (tenantIdLength == 0) {
            tenantId = null;
        }
        PName schemaName = newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1);
        long timeStamp = keyValue.getTimestamp();
        return new PSchema(schemaName.getString(), timeStamp);
    }

    private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
            throws IOException, SQLException {
        List<Cell> results = Lists.newArrayList();
        scanner.next(results);
        if (results.isEmpty()) {
            return null;
        }
        Cell[] functionKeyValues = new Cell[FUNCTION_KV_COLUMNS.size()];
        Cell[] functionArgKeyValues = new Cell[FUNCTION_ARG_KV_COLUMNS.size()];
        // Create PFunction based on KeyValues from scan
        Cell keyValue = results.get(0);
        byte[] keyBuffer = keyValue.getRowArray();
        int keyLength = keyValue.getRowLength();
        int keyOffset = keyValue.getRowOffset();
        long currentTimeMillis = EnvironmentEdgeManager.currentTimeMillis();
        if(isReplace) {
            long deleteTimeStamp =
                    clientTimeStamp == HConstants.LATEST_TIMESTAMP ? currentTimeMillis - 1
                            : (keyValue.getTimestamp() < clientTimeStamp ? clientTimeStamp - 1
                                    : keyValue.getTimestamp());
            deleteMutationsForReplace.add(new Delete(keyBuffer, keyOffset, keyLength, deleteTimeStamp));
        }
        PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
        int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length;
        if (tenantIdLength == 0) {
            tenantId = null;
        }
        PName functionName =
                newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1);
        int functionNameLength = functionName.getBytes().length+1;
        int offset = tenantIdLength + functionNameLength + 1;

        long timeStamp = keyValue.getTimestamp();

        int i = 0;
        int j = 0;
        while (i < results.size() && j < FUNCTION_KV_COLUMNS.size()) {
            Cell kv = results.get(i);
            Cell searchKv = FUNCTION_KV_COLUMNS.get(j);
            int cmp =
                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
                        kv.getQualifierLength(), searchKv.getQualifierArray(),
                        searchKv.getQualifierOffset(), searchKv.getQualifierLength());
            if (cmp == 0) {
                timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table
                                                                    // header row
                functionKeyValues[j++] = kv;
                i++;
            } else if (cmp > 0) {
                timeStamp = Math.max(timeStamp, kv.getTimestamp());
                functionKeyValues[j++] = null;
            } else {
                i++; // shouldn't happen - means unexpected KV in system table header row
            }
        }
        // CLASS_NAME,NUM_ARGS and JAR_PATH are required.
        if (functionKeyValues[CLASS_NAME_INDEX] == null || functionKeyValues[NUM_ARGS_INDEX] == null) {
            throw new IllegalStateException(
                    "Didn't find expected key values for function row in metadata row");
        }

        Cell classNameKv = functionKeyValues[CLASS_NAME_INDEX];
        PName className = newPName(classNameKv.getValueArray(), classNameKv.getValueOffset(),
            classNameKv.getValueLength());
        Cell jarPathKv = functionKeyValues[JAR_PATH_INDEX];
        PName jarPath = null;
        if(jarPathKv != null) {
            jarPath = newPName(jarPathKv.getValueArray(), jarPathKv.getValueOffset(),
                jarPathKv.getValueLength());
        }
        Cell numArgsKv = functionKeyValues[NUM_ARGS_INDEX];
        int numArgs =
                PInteger.INSTANCE.getCodec().decodeInt(numArgsKv.getValueArray(),
                    numArgsKv.getValueOffset(), SortOrder.getDefault());
        Cell returnTypeKv = functionKeyValues[RETURN_TYPE_INDEX];
        PName returnType =
                returnTypeKv == null ? null : newPName(returnTypeKv.getValueArray(),
                    returnTypeKv.getValueOffset(), returnTypeKv.getValueLength());

        List<FunctionArgument> arguments = Lists.newArrayListWithExpectedSize(numArgs);
        for (int k = 0; k < numArgs; k++) {
            results.clear();
            scanner.next(results);
            if (results.isEmpty()) {
                break;
            }
            Cell typeKv = results.get(0);
            if(isReplace) {
                long deleteTimeStamp =
                        clientTimeStamp == HConstants.LATEST_TIMESTAMP ? currentTimeMillis - 1
                                : (typeKv.getTimestamp() < clientTimeStamp ? clientTimeStamp - 1
                                        : typeKv.getTimestamp());
                deleteMutationsForReplace.add(new Delete(typeKv.getRowArray(), typeKv
                        .getRowOffset(), typeKv.getRowLength(), deleteTimeStamp));
            }
            int typeKeyLength = typeKv.getRowLength();
            PName typeName =
                    newPName(typeKv.getRowArray(), typeKv.getRowOffset() + offset, typeKeyLength
                            - offset - 3);
            
            int argPositionOffset =  offset + typeName.getBytes().length + 1;
            short argPosition = Bytes.toShort(typeKv.getRowArray(), typeKv.getRowOffset() + argPositionOffset, typeKeyLength
                - argPositionOffset);
            addArgumentToFunction(results, functionName, typeName, functionArgKeyValues, arguments, argPosition);
        }
        Collections.sort(arguments, new Comparator<FunctionArgument>() {
            @Override
            public int compare(FunctionArgument o1, FunctionArgument o2) {
                return o1.getArgPosition() - o2.getArgPosition();
            }
        });
        return new PFunction(tenantId, functionName.getString(), arguments, returnType.getString(),
                className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp);
    }
    
    private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
        long clientTimeStamp) throws IOException {
        if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
            return null;
        }

        Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
        scan.setFilter(new FirstKeyOnlyFilter());
        scan.setRaw(true);
        List<Cell> results = Lists.<Cell> newArrayList();
        try (RegionScanner scanner = region.getScanner(scan)) {
          scanner.next(results);
        }
        for (Cell kv : results) {
            KeyValue.Type type = Type.codeToType(kv.getTypeByte());
            if (type == Type.DeleteFamily) { // Row was deleted
                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                        GlobalCache.getInstance(this.env).getMetaDataCache();
                PTable table = newDeletedTableMarker(kv.getTimestamp());
                metaDataCache.put(cacheKey, table);
                return table;
            }
        }
        return null;
    }

    
    private PFunction buildDeletedFunction(byte[] key, ImmutableBytesPtr cacheKey, Region region,
        long clientTimeStamp) throws IOException {
        if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
            return null;
        }

        Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
        scan.setFilter(new FirstKeyOnlyFilter());
        scan.setRaw(true);
        List<Cell> results = Lists.<Cell> newArrayList();
        try (RegionScanner scanner = region.getScanner(scan);) {
          scanner.next(results);
        }
        // HBase ignores the time range on a raw scan (HBASE-7362)
        if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
            Cell kv = results.get(0);
            if (kv.getTypeByte() == Type.Delete.getCode()) {
                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                        GlobalCache.getInstance(this.env).getMetaDataCache();
                PFunction function = newDeletedFunctionMarker(kv.getTimestamp());
                metaDataCache.put(cacheKey, function);
                return function;
            }
        }
        return null;
    }

    private PSchema buildDeletedSchema(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp)
            throws IOException {
        if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; }

        Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
        scan.setFilter(new FirstKeyOnlyFilter());
        scan.setRaw(true);
        List<Cell> results = Lists.<Cell> newArrayList();
        try (RegionScanner scanner = region.getScanner(scan);) {
            scanner.next(results);
        }
        // HBase ignores the time range on a raw scan (HBASE-7362)
        if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
            Cell kv = results.get(0);
            if (kv.getTypeByte() == Type.Delete.getCode()) {
                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
                        .getMetaDataCache();
                PSchema schema = newDeletedSchemaMarker(kv.getTimestamp());
                metaDataCache.put(cacheKey, schema);
                return schema;
            }
        }
        return null;
    }

    private static PTable newDeletedTableMarker(long timestamp) {
        return new PTableImpl(timestamp);
    }

    private static PFunction newDeletedFunctionMarker(long timestamp) {
        return new PFunction(timestamp);
    }

    private static PSchema newDeletedSchemaMarker(long timestamp) {
        return new PSchema(timestamp);
    }

    private static boolean isTableDeleted(PTable table) {
        return table.getName() == null;
    }

    private static boolean isSchemaDeleted(PSchema schema) {
        return schema.getSchemaName() == null;
    }

    private static boolean isFunctionDeleted(PFunction function) {
        return function.getFunctionName() == null;
    }

    private PTable getTable(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey,
            long clientTimeStamp, long asOfTimeStamp, int clientVersion) throws IOException, SQLException {
        PTable table = loadTable(env, key, cacheKey, clientTimeStamp, asOfTimeStamp, clientVersion);
        if (table == null || isTableDeleted(table)) { return null; }
        return table;
    }

    private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, int clientVersion)
        throws IOException, SQLException {
        Region region = env.getRegion();
        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
        // We always cache the latest version - fault in if not in cache
        if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp, clientVersion)) != null) {
            return table;
        }
        // if not found then check if newer table already exists and add delete marker for timestamp
        // found
        if (table == null
                && (table = buildDeletedTable(key, cacheKey, region, clientTimeStamp)) != null) {
            return table;
        }
        return null;
    }

    private PFunction loadFunction(RegionCoprocessorEnvironment env, byte[] key,
            ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, boolean isReplace, List<Mutation> deleteMutationsForReplace)
            throws IOException, SQLException {
            Region region = env.getRegion();
            Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
            PFunction function = (PFunction)metaDataCache.getIfPresent(cacheKey);
            // We always cache the latest version - fault in if not in cache
            if (function != null && !isReplace) {
                return function;
            }
            ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1);
            arrayList.add(key);
            List<PFunction> functions = buildFunctions(arrayList, region, asOfTimeStamp, isReplace, deleteMutationsForReplace);
            if(functions != null) return functions.get(0);
            // if not found then check if newer table already exists and add delete marker for timestamp
            // found
            if (function == null
                    && (function = buildDeletedFunction(key, cacheKey, region, clientTimeStamp)) != null) {
                return function;
            }
            return null;
        }

    private PSchema loadSchema(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey,
            long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException {
        Region region = env.getRegion();
        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
        PSchema schema = (PSchema)metaDataCache.getIfPresent(cacheKey);
        // We always cache the latest version - fault in if not in cache
        if (schema != null) { return schema; }
        ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1);
        arrayList.add(key);
        List<PSchema> schemas = buildSchemas(arrayList, region, asOfTimeStamp, cacheKey);
        if (schemas != null) return schemas.get(0);
        // if not found then check if newer schema already exists and add delete marker for timestamp
        // found
        if (schema == null
                && (schema = buildDeletedSchema(key, cacheKey, region, clientTimeStamp)) != null) { return schema; }
        return null;
    }

    /**
     * 
     * @return null if the physical table row information is not present.
     * 
     */
    private static Mutation getPhysicalTableRowForView(List<Mutation> tableMetadata, byte[][] parentTenantSchemaTableNames, byte[][] physicalSchemaTableNames) {
        int size = tableMetadata.size();
        byte[][] rowKeyMetaData = new byte[3][];
        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
        Mutation physicalTableRow = null;
        Mutation parentTableRow = null;
        boolean physicalTableLinkFound = false;
        boolean parentTableLinkFound = false;
        if (size >= 2) {
            int i = size - 1;
            while (i >= 1) {
                Mutation m = tableMetadata.get(i);
                if (m instanceof Put) {
                    LinkType linkType = MetaDataUtil.getLinkType(m);
                    if (linkType == LinkType.PHYSICAL_TABLE) {
                        physicalTableRow = m;
                        physicalTableLinkFound = true;
                    }
                    if (linkType == LinkType.PARENT_TABLE) {
                        parentTableRow=m;
                        parentTableLinkFound = true;
                    }
                }
                if(physicalTableLinkFound && parentTableLinkFound){
                    break;
                }
                i--;
            }
        }
        if (!parentTableLinkFound) {
            parentTenantSchemaTableNames[0] = null;
            parentTenantSchemaTableNames[1] = null;
            parentTenantSchemaTableNames[2] = null;
            
        }
        if (!physicalTableLinkFound) {
            physicalSchemaTableNames[0] = null;
            physicalSchemaTableNames[1] = null;
            physicalSchemaTableNames[2] = null;
        }
        if (physicalTableLinkFound) {
            getSchemaTableNames(physicalTableRow,physicalSchemaTableNames);
        }
        if (parentTableLinkFound) {
            getSchemaTableNames(parentTableRow,parentTenantSchemaTableNames);   
        }
        return physicalTableRow;
    }
    
    private static void getSchemaTableNames(Mutation row, byte[][] schemaTableNames) {
        byte[][] rowKeyMetaData = new byte[5][];
        getVarChars(row.getRow(), 5, rowKeyMetaData);
        byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
        byte[] colBytes = rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
        byte[] famBytes = rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX];
        if ((colBytes == null || colBytes.length == 0) && (famBytes != null && famBytes.length > 0)) {
            byte[] sName = SchemaUtil.getSchemaNameFromFullName(famBytes).getBytes();
            byte[] tName = SchemaUtil.getTableNameFromFullName(famBytes).getBytes();
            schemaTableNames[0]= tenantId;
            schemaTableNames[1] = sName;
            schemaTableNames[2] = tName;
        }
    }
    
    @Override
    public void createTable(RpcController controller, CreateTableRequest request,
            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        byte[][] rowKeyMetaData = new byte[3][];
        byte[] schemaName = null;
        byte[] tableName = null;
        try {
            int clientVersion = request.getClientVersion();
            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
            schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
            tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
            boolean isNamespaceMapped = MetaDataUtil.isNameSpaceMapped(tableMetadata, GenericKeyValueBuilder.INSTANCE,
                    new ImmutableBytesWritable());
            final IndexType indexType = MetaDataUtil.getIndexType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
                    new ImmutableBytesWritable());
            byte[] parentSchemaName = null;
            byte[] parentTableName = null;
            PTableType tableType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new ImmutableBytesWritable());
            byte[] parentTableKey = null;
            Mutation viewPhysicalTableRow = null;
            Set<TableName> indexes = new HashSet<TableName>();;
            byte[] cPhysicalName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName, isNamespaceMapped)
                    .getBytes();
            byte[] cParentPhysicalName=null;
            if (tableType == PTableType.VIEW) {
                byte[][] parentSchemaTableNames = new byte[3][];
                byte[][] parentPhysicalSchemaTableNames = new byte[3][];
                /*
                 * For a view, we lock the base physical table row. For a mapped view, there is 
                 * no link present to the physical table. So the viewPhysicalTableRow is null
                 * in that case.
                 */
                
                viewPhysicalTableRow = getPhysicalTableRowForView(tableMetadata, parentSchemaTableNames,parentPhysicalSchemaTableNames);
                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                if (parentPhysicalSchemaTableNames[2] != null) {
                    
                    parentTableKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
                            parentPhysicalSchemaTableNames[1], parentPhysicalSchemaTableNames[2]);
                    PTable parentTable = getTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey),
                            clientTimeStamp, clientTimeStamp, clientVersion);
                    if (parentTable == null) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                    cParentPhysicalName = parentTable.getPhysicalName().getBytes();
                    if (parentSchemaTableNames[2] != null
                            && Bytes.compareTo(parentSchemaTableNames[2], parentPhysicalSchemaTableNames[2]) != 0) {
                        // if view is created on view
                        byte[] parentKey = SchemaUtil.getTableKey(
                                parentSchemaTableNames[0] == null ? ByteUtil.EMPTY_BYTE_ARRAY : parentSchemaTableNames[0],
                                parentSchemaTableNames[1], parentSchemaTableNames[2]);
                        parentTable = getTable(env, parentKey, new ImmutableBytesPtr(parentKey),
                                clientTimeStamp, clientTimeStamp, clientVersion);
                        if (parentTable == null) {
                            // it could be a global view
                            parentKey = SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
                                    parentSchemaTableNames[1], parentSchemaTableNames[2]);
                            parentTable = getTable(env, parentKey, new ImmutableBytesPtr(parentKey),
                                    clientTimeStamp, clientTimeStamp, clientVersion);
                        }
                    }
                    if (parentTable == null) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                    for (PTable index : parentTable.getIndexes()) {
                        indexes.add(TableName.valueOf(index.getPhysicalName().getBytes()));
                    }
                } else {
                    // Mapped View
                    cParentPhysicalName = SchemaUtil.getTableNameAsBytes(schemaName, tableName);
                }
                parentSchemaName = parentPhysicalSchemaTableNames[1];
                parentTableName = parentPhysicalSchemaTableNames[2];
                    
            } else if (tableType == PTableType.INDEX) {
                parentSchemaName = schemaName;
                /* 
                 * For an index we lock the parent table's row which could be a physical table or a view.
                 * If the parent table is a physical table, then the tenantIdBytes is empty because
                 * we allow creating an index with a tenant connection only if the parent table is a view.
                 */ 
                parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
                parentTableKey = SchemaUtil.getTableKey(tenantIdBytes, parentSchemaName, parentTableName);
                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                PTable parentTable = loadTable(env, parentTableKey, new ImmutableBytesPtr(parentTableKey),
                        clientTimeStamp, clientTimeStamp, clientVersion);
                if (IndexType.LOCAL == indexType) {
                    cPhysicalName = parentTable.getPhysicalName().getBytes();
                    cParentPhysicalName=parentTable.getPhysicalName().getBytes();
                } else if (parentTable.getType() == PTableType.VIEW) {
                    cPhysicalName = MetaDataUtil.getViewIndexPhysicalName(parentTable.getPhysicalName().getBytes());
                    cParentPhysicalName = parentTable.getPhysicalName().getBytes();
                }else{
                    cParentPhysicalName = SchemaUtil
                            .getPhysicalHBaseTableName(parentSchemaName, parentTableName, isNamespaceMapped).getBytes();
                }
            }
            
            getCoprocessorHost().preCreateTable(Bytes.toString(tenantIdBytes),
                    SchemaUtil.getTableName(schemaName, tableName),
                    (tableType == PTableType.VIEW) ? null : TableName.valueOf(cPhysicalName),
                    cParentPhysicalName == null ? null : TableName.valueOf(cParentPhysicalName), tableType,
                    /* TODO: During inital create we may not need the family map */
                    Collections.<byte[]> emptySet(), indexes);

            Region region = env.getRegion();
            List<RowLock> locks = Lists.newArrayList();
            // Place a lock using key for the table to be created
            byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
            try {
                acquireLock(region, tableKey, locks);

                // If the table key resides outside the region, return without doing anything
                MetaDataMutationResult result = checkTableKeyInRegion(tableKey, region);
                if (result != null) {
                    done.run(MetaDataMutationResult.toProto(result));
                    return;
                }

                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                ImmutableBytesPtr parentCacheKey = null;
                PTable parentTable = null;
                if (parentTableName != null) {
                    // Check if the parent table resides in the same region. If not, don't worry about locking the parent table row
                    // or loading the parent table. For a view, the parent table that needs to be locked is the base physical table.
                    // For an index on view, the view header row needs to be locked. 
                    result = checkTableKeyInRegion(parentTableKey, region);
                    if (result == null) {
                        acquireLock(region, parentTableKey, locks);
                        parentCacheKey = new ImmutableBytesPtr(parentTableKey);
                        parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp,
                                clientTimeStamp, clientVersion);
                        if (parentTable == null || isTableDeleted(parentTable)) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            done.run(builder.build());
                            return;
                        }
                        // make sure we haven't gone over our threshold for indexes on this table.
                        if (execeededIndexQuota(tableType, parentTable)) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            done.run(builder.build());
                            return;
                        }
                        long parentTableSeqNumber;
                        if (tableType == PTableType.VIEW && viewPhysicalTableRow != null && request.hasClientVersion()) {
                            // Starting 4.5, the client passes the sequence number of the physical table in the table metadata.
                            parentTableSeqNumber = MetaDataUtil.getSequenceNumber(viewPhysicalTableRow);
                        } else if (tableType == PTableType.VIEW && !request.hasClientVersion()) {
                            // Before 4.5, due to a bug, the parent table key wasn't available.
                            // So don't do anything and prevent the exception from being thrown.
                            parentTableSeqNumber = parentTable.getSequenceNumber();
                        } else {
                            parentTableSeqNumber = MetaDataUtil.getParentSequenceNumber(tableMetadata);
                        }
                        // If parent table isn't at the expected sequence number, then return
                        if (parentTable.getSequenceNumber() != parentTableSeqNumber) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            builder.setTable(PTableImpl.toProto(parentTable));
                            done.run(builder.build());
                            return;
                        }
                    }
                }
                // Load child table next
                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
                // Get as of latest timestamp so we can detect if we have a newer table that already
                // exists without making an additional query
                PTable table =
                        loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP, clientVersion);
                if (table != null) {
                    if (table.getTimeStamp() < clientTimeStamp) {
                        // If the table is older than the client time stamp and it's deleted,
                        // continue
                        if (!isTableDeleted(table)) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            builder.setTable(PTableImpl.toProto(table));
                            done.run(builder.build());
                            return;
                        }
                    } else {
                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        builder.setTable(PTableImpl.toProto(table));
                        done.run(builder.build());
                        return;
                    }
                }
                
                // Add cell for ROW_KEY_ORDER_OPTIMIZABLE = true, as we know that new tables
                // conform the correct row key. The exception is for a VIEW, which the client
                // sends over depending on its base physical table.
                if (tableType != PTableType.VIEW) {
                    UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, tableKey, clientTimeStamp);
                }
                // If the parent table of the view has the auto partition sequence name attribute, modify the 
                // tableMetadata and set the view statement and partition column correctly
                if (parentTable!=null && parentTable.getAutoPartitionSeqName()!=null) {
                    long autoPartitionNum = 1;
                    try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
                        Statement stmt = connection.createStatement()) {
                        String seqName = parentTable.getAutoPartitionSeqName();
                        // Not going through the standard route of using statement.execute() as that code path
                        // is blocked if the metadata hasn't been been upgraded to the new minor release.
                        String seqNextValueSql = String.format("SELECT NEXT VALUE FOR %s", seqName);
                        PhoenixStatement ps = stmt.unwrap(PhoenixStatement.class);
                        QueryPlan plan = ps.compileQuery(seqNextValueSql);
                        ResultIterator resultIterator = plan.iterator();
                        PhoenixResultSet rs = ps.newResultSet(resultIterator, plan.getProjector(), plan.getContext());
                        rs.next();
                        autoPartitionNum = rs.getLong(1);
                    }
                    catch (SequenceNotFoundException e) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.AUTO_PARTITION_SEQUENCE_NOT_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                    PColumn autoPartitionCol = parentTable.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parentTable));
                    if (!PLong.INSTANCE.isCoercibleTo(autoPartitionCol.getDataType(), autoPartitionNum)) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.CANNOT_COERCE_AUTO_PARTITION_ID);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                    builder.setAutoPartitionNum(autoPartitionNum);
                    
                    // set the VIEW STATEMENT column of the header row
                    Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
                    NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
                    List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
                    Cell cell = cells.get(0);
                    String autoPartitionWhere = QueryUtil.getViewPartitionClause(MetaDataUtil.getAutoPartitionColumnName(parentTable), autoPartitionNum);
                    String hbaseVersion = VersionInfo.getVersion();
                    ImmutableBytesPtr ptr = new ImmutableBytesPtr();
                    KeyValueBuilder kvBuilder = KeyValueBuilder.get(hbaseVersion);
                    MetaDataUtil.getMutationValue(tableHeaderPut, VIEW_STATEMENT_BYTES, kvBuilder, ptr);
                    byte[] value = ptr.copyBytesIfNecessary();
                    byte[] viewStatement = null;
                    // if we have an existing where clause add the auto partition where clause to it
                    if (!Bytes.equals(value, QueryConstants.EMPTY_COLUMN_VALUE_BYTES)) {
                        viewStatement = Bytes.add(value, Bytes.toBytes(" AND "), Bytes.toBytes(autoPartitionWhere));
                    }
                    else { 
                        viewStatement = Bytes.toBytes(QueryUtil.getViewStatement(parentTable.getSchemaName().getString(), parentTable.getTableName().getString(), autoPartitionWhere));
                    }
                    Cell viewStatementCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_STATEMENT_BYTES,
                        cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), viewStatement);
                    cells.add(viewStatementCell);
                    
                    // set the IS_VIEW_REFERENCED column of the auto partition column row
                    Put autoPartitionPut = MetaDataUtil.getPutOnlyAutoPartitionColumn(parentTable, tableMetadata);
                    familyCellMap = autoPartitionPut.getFamilyCellMap();
                    cells = familyCellMap.get(TABLE_FAMILY_BYTES);
                    cell = cells.get(0);
                    PDataType dataType = autoPartitionCol.getDataType();
                    Object val = dataType.toObject(autoPartitionNum, PLong.INSTANCE);
                    byte[] bytes = new byte [dataType.getByteSize() + 1];
                    dataType.toBytes(val, bytes, 0);
                    Cell viewConstantCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_CONSTANT_BYTES,
                        cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
                    cells.add(viewConstantCell);
                }
                Short indexId = null;
                if (request.hasAllocateIndexId() && request.getAllocateIndexId()) {
                    String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes);
                    try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                        PName physicalName = parentTable.getPhysicalName();
                        int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
                        SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
                            nSequenceSaltBuckets, parentTable.isNamespaceMapped() );
                        // TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should
                        // use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN
                        // or not. 
                        long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
                        try {
                            connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
                                Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false, sequenceTimestamp);
                        } catch (SequenceAlreadyExistsException e) {
                        }
                        long[] seqValues = new long[1];
                        SQLException[] sqlExceptions = new SQLException[1];
                        connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
                            HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions);
                        if (sqlExceptions[0] != null) {
                            throw sqlExceptions[0];
                        }
                        long seqValue = seqValues[0];
                        if (seqValue > Short.MAX_VALUE) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            done.run(builder.build());
                            return;
                        }
                        Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
                        NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
                        List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
                        Cell cell = cells.get(0);
                        PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
                        Object val = dataType.toObject(seqValue, PLong.INSTANCE);
                        byte[] bytes = new byte [dataType.getByteSize() + 1];
                        dataType.toBytes(val, bytes, 0);
                        Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES,
                            cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
                        cells.add(indexIdCell);
                        indexId = (short) seqValue;
                    }
                }
                
                // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
                // system table. Basically, we get all the locks that we don't already hold for all the
                // tableMetadata rows. This ensures we don't have deadlock situations (ensuring
                // primary and then index table locks are held, in that order). For now, we just don't support
                // indexing on the system table. This is an issue because of the way we manage batch mutation
                // in the Indexer.
                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);

                // Invalidate the cache - the next getTable call will add it
                // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache
                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                if (parentCacheKey != null) {
                    metaDataCache.invalidate(parentCacheKey);
                }
                metaDataCache.invalidate(cacheKey);
                // Get timeStamp from mutations - the above method sets it if it's unset
                long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                if (indexId != null) {
                    builder.setViewIndexId(indexId);
                }
                builder.setMutationTime(currentTimeStamp);
                done.run(builder.build());
                return;
            } finally {
                releaseRowLocks(region,locks);
            }
        } catch (Throwable t) {
            logger.error("createTable failed", t);
            ProtobufUtil.setControllerException(controller,
                    ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
        }
    }

    private boolean execeededIndexQuota(PTableType tableType, PTable parentTable) {
        return PTableType.INDEX == tableType && parentTable.getIndexes().size() >= maxIndexesPerTable;
    }
    
    private void findAllChildViews(Region region, byte[] tenantId, PTable table,
            TableViewFinder result, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
        TableViewFinder currResult = findChildViews(region, tenantId, table, clientVersion, false);
        result.addResult(currResult);
        for (ViewInfo viewInfo : currResult.getViewInfoList()) {
            byte[] viewtenantId = viewInfo.getTenantId();
            byte[] viewSchema = viewInfo.getSchemaName();
            byte[] viewTable = viewInfo.getViewName();
            byte[] tableKey = SchemaUtil.getTableKey(viewtenantId, viewSchema, viewTable);
            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
            PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp, clientVersion);
            if (view == null) {
                logger.warn("Found orphan tenant view row in SYSTEM.CATALOG with tenantId:"
                        + Bytes.toString(tenantId) + ", schema:"
                        + Bytes.toString(viewSchema) + ", table:"
                        + Bytes.toString(viewTable));
                continue;
            }
            findAllChildViews(region, viewtenantId, view, result, clientTimeStamp, clientVersion);
        }
    }
        
    // TODO use child link instead once splittable system catalog (PHOENIX-3534) is implemented
    // and we have a separate table for links.
    private TableViewFinder findChildViews_deprecated(Region region, byte[] tenantId, PTable table, byte[] linkTypeBytes, boolean stopAfterFirst) throws IOException {
        byte[] schemaName = table.getSchemaName().getBytes();
        byte[] tableName = table.getTableName().getBytes();
        boolean isMultiTenant = table.isMultiTenant();
        Scan scan = new Scan();
        // If the table is multi-tenant, we need to check across all tenant_ids,
        // so we can't constrain the row key. Otherwise, any views would have
        // the same tenantId.
        if (!isMultiTenant) {
            byte[] startRow = ByteUtil.concat(tenantId, QueryConstants.SEPARATOR_BYTE_ARRAY);
            byte[] stopRow = ByteUtil.nextKey(startRow);
            scan.setStartRow(startRow);
            scan.setStopRow(stopRow);
        }
        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes);
        SingleColumnValueFilter tableTypeFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES,
                CompareOp.EQUAL, PTableType.VIEW.getSerializedValue().getBytes());
        tableTypeFilter.setFilterIfMissing(false);
        linkFilter.setFilterIfMissing(true);
        byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil
                .getPhysicalHBaseTableName(schemaName, tableName, table.isNamespaceMapped())
                .getBytes());
        SuffixFilter rowFilter = new SuffixFilter(suffix);
        List<Filter> filters = Lists.<Filter>newArrayList(linkFilter,tableTypeFilter,rowFilter);
        if (stopAfterFirst) {
            filters.add(new PageFilter(1));
        }
        FilterList filter = new FilterList(filters);
        scan.setFilter(filter);
        scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES);
        scan.addColumn(TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
        
        // Original region-only scanner modified due to PHOENIX-1208
        // RegionScanner scanner = region.getScanner(scan);
        // The following *should* work, but doesn't due to HBASE-11837
        // TableName systemCatalogTableName = region.getTableDesc().getTableName();
        // HTableInterface hTable = env.getTable(systemCatalogTableName);
        // These deprecated calls work around the issue
        try (HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env,
            region.getTableDesc().getTableName().getName())) {
            boolean allViewsInCurrentRegion = true;
            int numOfChildViews = 0;
            List<ViewInfo> viewInfoList = Lists.newArrayList();
            try (ResultScanner scanner = hTable.getScanner(scan)) {
                for (Result result = scanner.next(); (result != null); result = scanner.next()) {
                    numOfChildViews++;
                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                    ResultTuple resultTuple = new ResultTuple(result);
                    resultTuple.getKey(ptr);
                    byte[] key = ptr.copyBytes();
                    if (checkTableKeyInRegion(key, region) != null) {
                        allViewsInCurrentRegion = false;
                    }
                    byte[][] rowKeyMetaData = new byte[3][];
                    getVarChars(result.getRow(), 3, rowKeyMetaData);
                    byte[] viewTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
                    byte[] viewSchemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
                    byte[] viewName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
                    viewInfoList.add(new ViewInfo(viewTenantId, viewSchemaName, viewName));
                }
                TableViewFinder tableViewFinderResult = new TableViewFinder(viewInfoList);
                if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
                    tableViewFinderResult.setAllViewsNotInSingleRegion();
                }
                return tableViewFinderResult;
            }
        }
    }
    
    private TableViewFinder findChildViews_4_11(Region region, byte[] tenantId, byte[] schemaName, byte[] tableName, boolean stopAfterFirst) throws IOException {
        Scan scan = new Scan();
        byte[] startRow = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
        byte[] stopRow = ByteUtil.nextKey(startRow);
        scan.setStartRow(startRow);
        scan.setStopRow(stopRow);
        SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, CHILD_TABLE_BYTES);
        Filter filter = linkFilter;
        linkFilter.setFilterIfMissing(true);
        if (stopAfterFirst) {
            filter = new FilterList(linkFilter, new PageFilter(1));
        }
        scan.setFilter(filter);
        scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
        scan.addColumn(TABLE_FAMILY_BYTES, PARENT_TENANT_ID_BYTES);
        
        // Original region-only scanner modified due to PHOENIX-1208
        // RegionScanner scanner = region.getScanner(scan);
        // The following *should* work, but doesn't due to HBASE-11837
        // TableName systemCatalogTableName = region.getTableDesc().getTableName();
        // HTableInterface hTable = env.getTable(systemCatalogTableName);
        // These deprecated calls work around the issue
        try (HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env,
            region.getTableDesc().getTableName().getName())) {
            boolean allViewsInCurrentRegion = true;
            int numOfChildViews = 0;
            List<ViewInfo> viewInfoList = Lists.newArrayList();
            try (ResultScanner scanner = hTable.getScanner(scan)) {
                for (Result result = scanner.next(); (result != null); result = scanner.next()) {
                    numOfChildViews++;
                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                    ResultTuple resultTuple = new ResultTuple(result);
                    resultTuple.getKey(ptr);
                    byte[] key = ptr.copyBytes();
                    if (checkTableKeyInRegion(key, region) != null) {
                        allViewsInCurrentRegion = false;
                    }
                    byte[][] rowViewKeyMetaData = new byte[5][];
                    getVarChars(result.getRow(), 5, rowViewKeyMetaData);
                    byte[] viewTenantId = rowViewKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX];
                    byte[] viewSchemaName = SchemaUtil.getSchemaNameFromFullName(rowViewKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
                    byte[] viewName = SchemaUtil.getTableNameFromFullName(rowViewKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes();
                    viewInfoList.add(new ViewInfo(viewTenantId, viewSchemaName, viewName));
                }
                TableViewFinder tableViewFinderResult = new TableViewFinder(viewInfoList);
                if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
                    tableViewFinderResult.setAllViewsNotInSingleRegion();
                }
                return tableViewFinderResult;
            }
        }
    }

    private TableViewFinder findChildViews(Region region, byte[] tenantId, PTable table, int clientVersion, boolean stopAfterFirst)
            throws IOException, SQLException {
        byte[] tableKey =
                SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA_BYTES,
                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES);
        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
        PTable systemCatalog =
                loadTable(env, tableKey, cacheKey, MIN_SYSTEM_TABLE_TIMESTAMP,
                    HConstants.LATEST_TIMESTAMP, clientVersion);
        if (systemCatalog.getTimeStamp() < MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) {
            return findChildViews_deprecated(region, tenantId, table, PHYSICAL_TABLE_BYTES, stopAfterFirst);
        } else {
            return findChildViews_4_11(region, tenantId, 
                    table.getSchemaName() == null ? ByteUtil.EMPTY_BYTE_ARRAY : table.getSchemaName().getBytes(),
                    table.getTableName().getBytes(), stopAfterFirst);
        }
    }
    
    @Override
    public void dropTable(RpcController controller, DropTableRequest request,
            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        boolean isCascade = request.getCascade();
        byte[][] rowKeyMetaData = new byte[3][];
        String tableType = request.getTableType();
        byte[] schemaName = null;
        byte[] tableName = null;

        try {
            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
            schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
            tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
            // Disallow deletion of a system table
            if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
                builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
                builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                done.run(builder.build());
                return;
            }
            List<byte[]> tableNamesToDelete = Lists.newArrayList();
            List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
            // No need to lock parent table for views
            byte[] parentTableName = MetaDataUtil.getParentTableName(tableMetadata);
            byte[] lockTableName = parentTableName == null || tableType.equals(PTableType.VIEW.getSerializedValue()) ? tableName : parentTableName;
            byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, lockTableName);
            byte[] key =
                    parentTableName == null ? lockKey : SchemaUtil.getTableKey(tenantIdBytes,
                        schemaName, tableName);
            Region region = env.getRegion();
            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
                return;
            }
            PTableType ptableType=PTableType.fromSerializedValue(tableType);
            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
            byte[] cKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName);
            PTable loadedTable = getTable(env, cKey, new ImmutableBytesPtr(cKey), clientTimeStamp, clientTimeStamp,
                    request.getClientVersion());
            if (loadedTable == null) {
                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                done.run(builder.build());
                return;
            }
            getCoprocessorHost().preDropTable(Bytes.toString(tenantIdBytes),
                    SchemaUtil.getTableName(schemaName, tableName),
                    TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
                    getParentPhysicalTableName(loadedTable), ptableType,loadedTable.getIndexes());
            List<RowLock> locks = Lists.newArrayList();

            try {
                acquireLock(region, lockKey, locks);
                if (key != lockKey) {
                    acquireLock(region, key, locks);
                }

                List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
                result =
                        doDropTable(key, tenantIdBytes, schemaName, tableName, parentTableName,
                            PTableType.fromSerializedValue(tableType), tableMetadata,
                            invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade, request.getClientVersion());
                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                    done.run(MetaDataMutationResult.toProto(result));
                    return;
                }
                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                // Commit the list of deletion.
                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                    HConstants.NO_NONCE);
                long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
                for (ImmutableBytesPtr ckey : invalidateList) {
                    metaDataCache.put(ckey, newDeletedTableMarker(currentTime));
                }
                if (parentTableName != null) {
                    ImmutableBytesPtr parentCacheKey = new ImmutableBytesPtr(lockKey);
                    metaDataCache.invalidate(parentCacheKey);
                }
                done.run(MetaDataMutationResult.toProto(result));
                return;
            } finally {
                releaseRowLocks(region,locks);
            }
        } catch (Throwable t) {
          logger.error("dropTable failed", t);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
        }
    }
    
    protected void releaseRowLocks(Region region, List<RowLock> locks) {
        if (locks != null) {
            region.releaseRowLocks(locks);
        }
    }

    private RowLock acquireLock(Region region, byte[] lockKey, List<RowLock> locks) throws IOException {
        //LockManager.RowLock rowLock = lockManager.lockRow(lockKey, rowLockWaitDuration);
        RowLock rowLock = region.getRowLock(lockKey, false);
        if (rowLock == null) {
            throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(lockKey));
        }
        if (locks != null) {
            locks.add(rowLock);
        }
        return rowLock;
    }

    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
        byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete,
        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
        List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion) throws IOException, SQLException {


        long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);

        Region region = env.getRegion();
        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);

        Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
        PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);

        // We always cache the latest version - fault in if not in cache
        if (table != null
                || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) != null) {
            if (table.getTimeStamp() < clientTimeStamp) {
                if (isTableDeleted(table) || tableType != table.getType()) {
                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
                }
            } else {
                return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                        EnvironmentEdgeManager.currentTimeMillis(), null);
            }
        }
        // We didn't find a table at the latest timestamp, so either there is no table or
        // there was a table, but it's been deleted. In either case we want to return.
        if (table == null) {
            if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) {
                return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
            }
            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
        }
        // Make sure we're not deleting the "wrong" child
        if (parentTableName!=null && table.getParentTableName() != null && !Arrays.equals(parentTableName, table.getParentTableName().getBytes())) {
            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
        }
        // Since we don't allow back in time DDL, we know if we have a table it's the one
        // we want to delete. FIXME: we shouldn't need a scan here, but should be able to
        // use the table to generate the Delete markers.
        Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
        List<byte[]> indexNames = Lists.newArrayList();
        List<Cell> results = Lists.newArrayList();
        try (RegionScanner scanner = region.getScanner(scan);) {
            scanner.next(results);
            if (results.isEmpty()) { // Should not be possible
                return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                        EnvironmentEdgeManager.currentTimeMillis(), null);
            }

            if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) {
                // Handle any child views that exist
                TableViewFinder tableViewFinderResult = findChildViews(region, tenantId, table, clientVersion, !isCascade);
                if (tableViewFinderResult.hasViews()) {
                    if (isCascade) {
                        if (tableViewFinderResult.allViewsInMultipleRegions()) {
                            // We don't yet support deleting a table with views where SYSTEM.CATALOG has split and the
                            // view metadata spans multiple regions
                            return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
                                    EnvironmentEdgeManager.currentTimeMillis(), null);
                        } else if (tableViewFinderResult.allViewsInSingleRegion()) {
                            // Recursively delete views - safe as all the views as all in the same region
                            for (ViewInfo viewInfo : tableViewFinderResult.getViewInfoList()) {
                                byte[] viewTenantId = viewInfo.getTenantId();
                                byte[] viewSchemaName = viewInfo.getSchemaName();
                                byte[] viewName = viewInfo.getViewName();
                                byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
                                Delete delete = new Delete(viewKey, clientTimeStamp);
                                rowsToDelete.add(delete);
                                acquireLock(region, viewKey, locks);
                                MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName,
                                        viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks,
                                        tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
                                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; }
                            }
                        }
                    } else {
                        // DROP without CASCADE on tables with child views is not permitted
                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
                                EnvironmentEdgeManager.currentTimeMillis(), null);
                    }
                }
            }

            // Add to list of HTables to delete, unless it's a view or its a shared index
            if (tableType != PTableType.VIEW && table.getViewIndexId()==null) { 
                tableNamesToDelete.add(table.getPhysicalName().getBytes());
            }
            else {
                sharedTablesToDelete.add(new SharedTableState(table));
            }
            invalidateList.add(cacheKey);
            byte[][] rowKeyMetaData = new byte[5][];
            do {
                Cell kv = results.get(LINK_TYPE_INDEX);
                int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), 0, rowKeyMetaData);
                if (nColumns == 5
                        && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0
                        && Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
                                LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length) == 0) {
                        LinkType linkType = LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]);
                        if (rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0 && linkType == LinkType.INDEX_TABLE) {
                            indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
                        } else if (linkType == LinkType.PARENT_TABLE || linkType == LinkType.PHYSICAL_TABLE) {
                            // delete parent->child link for views
                            Cell parentTenantIdCell = MetaDataUtil.getCell(results, PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES);
                            PName parentTenantId = parentTenantIdCell!=null ? PNameFactory.newName(parentTenantIdCell.getValueArray(), parentTenantIdCell.getValueOffset(), parentTenantIdCell.getValueLength()) : null;
                            byte[] linkKey = MetaDataUtil.getChildLinkKey(parentTenantId, table.getParentSchemaName(), table.getParentTableName(), table.getTenantId(), table.getName());
                            Delete linkDelete = new Delete(linkKey, clientTimeStamp);
                            rowsToDelete.add(linkDelete);
                        }
                }
                // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
                // FIXME: the version of the Delete constructor without the lock args was introduced
                // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
                // of the client.
                Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
                rowsToDelete.add(delete);
                results.clear();
                scanner.next(results);
            } while (!results.isEmpty());
        }
        
        // Recursively delete indexes
        for (byte[] indexName : indexNames) {
            byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, indexName);
            // FIXME: Remove when unintentionally deprecated method is fixed (HBASE-7870).
            // FIXME: the version of the Delete constructor without the lock args was introduced
            // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
            // of the client.
            Delete delete = new Delete(indexKey, clientTimeStamp);
            rowsToDelete.add(delete);
            acquireLock(region, indexKey, locks);
            MetaDataMutationResult result =
                    doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX,
                        rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                return result;
            }
        }

        return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
                EnvironmentEdgeManager.currentTimeMillis(), table, tableNamesToDelete);
    }

    private static interface ColumnMutator {
      MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
          List<Mutation> tableMetadata, Region region,
          List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp) throws IOException,
          SQLException;
    }

    private MetaDataMutationResult
    mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator, int clientVersion) throws IOException {
        byte[][] rowKeyMetaData = new byte[5][];
        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
        byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
        byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
        byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
        try {
            Region region = env.getRegion();
            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
            if (result != null) {
                return result;
            }
            List<RowLock> locks = Lists.newArrayList();
            try {
                acquireLock(region, key, locks);
                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
                List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
                invalidateList.add(cacheKey);
                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
                if (logger.isDebugEnabled()) {
                    if (table == null) {
                        logger.debug("Table " + Bytes.toStringBinary(key)
                                + " not found in cache. Will build through scan");
                    } else {
                        logger.debug("Table " + Bytes.toStringBinary(key)
                                + " found in cache with timestamp " + table.getTimeStamp()
                                + " seqNum " + table.getSequenceNumber());
                    }
                }
                // Get client timeStamp from mutations
                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                if (table == null
                        && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
                    // if not found then call newerTableExists and add delete marker for timestamp
                    // found
                    table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
                    if (table != null) {
                        logger.info("Found newer table deleted as of " + table.getTimeStamp() + " versus client timestamp of " + clientTimeStamp);
                        return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                                EnvironmentEdgeManager.currentTimeMillis(), null);
                    }
                    return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), null);
                }
                if (table.getTimeStamp() >= clientTimeStamp) {
                    logger.info("Found newer table as of " + table.getTimeStamp() + " versus client timestamp of "
                            + clientTimeStamp);
                    return new MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), table);
                } else if (isTableDeleted(table)) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                        EnvironmentEdgeManager.currentTimeMillis(), null); }
                long expectedSeqNum = MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup TABLE_SEQ_NUM in
                                                                                         // tableMetaData

                if (logger.isDebugEnabled()) {
                    logger.debug("For table " + Bytes.toStringBinary(key) + " expecting seqNum "
                            + expectedSeqNum + " and found seqNum " + table.getSequenceNumber()
                            + " with " + table.getColumns().size() + " columns: "
                            + table.getColumns());
                }
                if (expectedSeqNum != table.getSequenceNumber()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("For table " + Bytes.toStringBinary(key)
                                + " returning CONCURRENT_TABLE_MUTATION due to unexpected seqNum");
                    }
                    return new MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
                            EnvironmentEdgeManager.currentTimeMillis(), table);
                }

                PTableType type = table.getType();
                if (type == PTableType.INDEX) {
                    // Disallow mutation of an index table
                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
                            EnvironmentEdgeManager.currentTimeMillis(), null);
                } else {
                    // server-side, except for indexing, we always expect the keyvalues to be standard KeyValues
                    PTableType expectedType = MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE,
                            new ImmutableBytesWritable());
                    // We said to drop a table, but found a view or visa versa
                    if (type != expectedType) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), null); }
                }
                result = mutator.updateMutation(table, rowKeyMetaData, tableMetadata, region,
                            invalidateList, locks, clientTimeStamp);
                // if the update mutation caused tables to be deleted, the mutation code returned will be MutationCode.TABLE_ALREADY_EXISTS 
                if (result != null && result.getMutationCode()!=MutationCode.TABLE_ALREADY_EXISTS) {
                    return result;
                }
                mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
                // Invalidate from cache
                for (ImmutableBytesPtr invalidateKey : invalidateList) {
                    metaDataCache.invalidate(invalidateKey);
                }
                // Get client timeStamp from mutations, since it may get updated by the
                // mutateRowsWithLocks call
                long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
                // if the update mutation caused tables to be deleted just return the result which will contain the table to be deleted
                if (result !=null) {
                    return result;
                } else {
                    table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
                    return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
                }
            } finally {
                releaseRowLocks(region,locks);
            }
        } catch (Throwable t) {
            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t);
            return null; // impossible
        }
    }
    
    private final class PutWithOrdinalPosition implements Comparable<PutWithOrdinalPosition>{
        private final Put put;
        private final int ordinalPosition;
        
        public PutWithOrdinalPosition(Put put, int ordinalPos) {
            this.put = put;
            this.ordinalPosition = ordinalPos;
        }

        @Override
        public int compareTo(PutWithOrdinalPosition o) {
            return (this.ordinalPosition < o.ordinalPosition ? -1 : this.ordinalPosition > o.ordinalPosition ? 1 : 0);
        }
    }
    
    private static int getOrdinalPosition(PTable table, PColumn col) {
        return table.getBucketNum() == null ? col.getPosition() + 1 : col.getPosition();
    }
    
    private static boolean isDivergedView(PTable view) {
        return view.getBaseColumnCount() == QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
    }
    
    /**
     * 
     * Class to keep track of columns and their ordinal positions as we
     * process through the list of columns to be added.
     *
     */
    private static class ColumnOrdinalPositionUpdateList {
        final List<byte[]> columnKeys = new ArrayList<>(10);
        int offset;
        
        int size()  {
            return columnKeys.size();
        }
        
        private void setOffset(int lowestOrdinalPos) {
            this.offset = lowestOrdinalPos;
        }

        private void addColumn(byte[] columnKey) {
            columnKeys.add(columnKey);
        }

        private void dropColumn(byte[] columnKey) {
            // Check if an entry for this column key exists
            int index = -1;
            for (int i = 0; i < columnKeys.size(); i++) {
                if (Bytes.equals(columnKeys.get(i), columnKey)) {
                    index = i;
                    break;
                }
            }
            if (index != -1) {
                columnKeys.remove(index);
            }
        }
        
        private void addColumn(byte[] columnKey, int position) {
            checkArgument(position >= this.offset);
            int index = position - offset;
            int size = columnKeys.size();
            checkState(index <= size);
            if (size == 0) {
                columnKeys.add(columnKey);
                return;
            }
            int stopIndex = size;
            // Check if an entry for this column key is already there.
            for (int i = 0; i < size; i++) {
                if (Bytes.equals(columnKeys.get(i), columnKey)) {
                    stopIndex = i;
                    break;
                }
            }
            if (stopIndex == size) {
                /*
                 * The column key is not present in the list. So add it at the specified index
                 * and right shift the elements at this index and beyond.
                 */
                columnKeys.add(index, columnKey);
            } else {
                /*
                 * The column key is already present in the list.
                 * Move the elements of the list to the left up to the stop index
                 */
                for (int i = stopIndex; i > index; i--) {
                    columnKeys.set(i, columnKeys.get(i - 1));
                }
                columnKeys.set(index, columnKey);
            }
        }
        
        private int getOrdinalPositionFromListIdx(int listIndex) {
            checkArgument(listIndex < columnKeys.size());
            return listIndex + offset;
        }
        
        /**
         * @param columnKey
         * @return if present - the ordinal position of the column in this list.
         * If not present - -1.
         */
        private int getOrdinalPositionOfColumn(byte[] columnKey) {
            int i = 0;
            for (byte[] key : columnKeys) {
                if (Bytes.equals(key, columnKey)) {
                    return i + offset;
                }
                i++;
            }
            return -1;
        }
    }
    
    private static byte[] getColumnKey(byte[] viewKey, PColumn column) {
        return getColumnKey(viewKey, column.getName().getString(), column.getFamilyName() != null ? column.getFamilyName().getString() : null);
    }
    
    private static byte[] getColumnKey(byte[] viewKey, String columnName, String columnFamily) {
        byte[] columnKey = ByteUtil.concat(viewKey, QueryConstants.SEPARATOR_BYTE_ARRAY,
                Bytes.toBytes(columnName));
        if (columnFamily != null) {
            columnKey = ByteUtil.concat(columnKey, QueryConstants.SEPARATOR_BYTE_ARRAY,
                    Bytes.toBytes(columnFamily));
        }
        return columnKey;
    }
    
    private boolean switchAttribute(PTable table, boolean currAttribute, List<Mutation> tableMetaData, byte[] attrQualifier) {
        for (Mutation m : tableMetaData) {
            if (m instanceof Put) {
                Put p = (Put)m;
                List<Cell> cells = p.get(TABLE_FAMILY_BYTES, attrQualifier);
                if (cells != null && cells.size() > 0) {
                    Cell cell = cells.get(0);
                    boolean newAttribute = (boolean)PBoolean.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); 
                    return currAttribute != newAttribute;
                }
            }
        }
        return false;
    }
    
    private MetaDataMutationResult addColumnsAndTablePropertiesToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
            List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinder childViewsResult,
            Region region, List<RowLock> locks, int clientVersion) throws IOException, SQLException {
        List<PutWithOrdinalPosition> columnPutsForBaseTable = Lists.newArrayListWithExpectedSize(tableMetadata.size());
        Map<TableProperty, Cell> tablePropertyCellMap = Maps.newHashMapWithExpectedSize(tableMetadata.size());
        // Isolate the puts relevant to adding columns. Also figure out what kind of columns are being added.
        for (Mutation m : tableMetadata) {
            if (m instanceof Put) {
                byte[][] rkmd = new byte[5][];
                int pkCount = getVarChars(m.getRow(), rkmd);
                // check if this put is for adding a column
                if (pkCount > COLUMN_NAME_INDEX
                        && rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0
                        && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
                        && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
                    columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES)));
                }
                // check if the put is for a table property
                else if (pkCount <= COLUMN_NAME_INDEX
                        && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
                        && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
                    for (Cell cell : m.getFamilyCellMap().get(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)) {
                        for (TableProperty tableProp : TableProperty.values()) {
                            byte[] propNameBytes = Bytes.toBytes(tableProp.getPropertyName());
                            if (Bytes.compareTo(propNameBytes, 0, propNameBytes.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())==0
                                    && tableProp.isValidOnView() && tableProp.isMutable()) {
                                Cell tablePropCell = CellUtil.createCell(cell.getRow(), CellUtil.cloneFamily(cell),
                                    CellUtil.cloneQualifier(cell), cell.getTimestamp(), cell.getTypeByte(),
                                    CellUtil.cloneValue(cell));
                                tablePropertyCellMap.put(tableProp, tablePropCell);
                            }
                        }
                    }
                }
            }
        }
        // Sort the puts by ordinal position 
        Collections.sort(columnPutsForBaseTable);
        for (ViewInfo viewInfo : childViewsResult.getViewInfoList()) {
            short deltaNumPkColsSoFar = 0;
            short columnsAddedToView = 0;
            short columnsAddedToBaseTable = 0;
            byte[] tenantId = viewInfo.getTenantId();
            byte[] schema = viewInfo.getSchemaName();
            byte[] table = viewInfo.getViewName();
            byte[] viewKey = SchemaUtil.getTableKey(tenantId, schema, table);
            
            // lock the rows corresponding to views so that no other thread can modify the view meta-data
            RowLock viewRowLock = acquireLock(region, viewKey, locks);
            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);
            if (view == null) {
                logger.warn("Found orphan tenant view row in SYSTEM.CATALOG with tenantId:"
                    + Bytes.toString(tenantId) + ", schema:"
                    + Bytes.toString(schema) + ", table:"
                    + Bytes.toString(table));
                continue;
            }
            /*
             * Disallow adding columns to a base table with APPEND_ONLY_SCHEMA since this
             * creates a gap in the column positions for every view (PHOENIX-4737).
             */
            if (!columnPutsForBaseTable.isEmpty() && view.isAppendOnlySchema()) {
                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
            }
            
            ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList();
            List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns());
            boolean addingExistingPkCol = false;
            int numCols = view.getColumns().size();
            // add the new columns to the child view
            for (PutWithOrdinalPosition p : columnPutsForBaseTable) {
                Put baseTableColumnPut = p.put;
                PColumn existingViewColumn = null;
                byte[][] rkmd = new byte[5][];
                getVarChars(baseTableColumnPut.getRow(), rkmd);
                String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
                String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
                try {
                    existingViewColumn = columnFamily == null ? view.getColumnForColumnName(columnName) : view.getColumnFamily(
                            columnFamily).getPColumnForColumnName(columnName);
                } catch (ColumnFamilyNotFoundException e) {
                    // ignore since it means that the column family is not present for the column to be added.
                } catch (ColumnNotFoundException e) {
                    // ignore since it means the column is not present in the view
                }
                
                boolean isPkCol = columnFamily == null;
                byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
                if (existingViewColumn != null) {
                    MetaDataMutationResult result = validateColumnForAddToBaseTable(existingViewColumn, baseTableColumnPut, basePhysicalTable, isPkCol, view);
                    if (result != null) {
                        return result;
                    }
                    if (isPkCol) {
                        viewPkCols.remove(existingViewColumn);
                        addingExistingPkCol = true;
                    }
                    /*
                     * For views that are not diverged, we need to make sure that the existing columns
                     * have the same ordinal position as in the base table. This is important because
                     * we rely on the ordinal position of the column to figure out whether dropping a 
                     * column from the view will end up diverging the view from the base table.
                     * 
                     * For already diverged views, we don't care about the ordinal position of the existing column.
                     */
                    if (!isDivergedView(view)) {
                        int newOrdinalPosition = p.ordinalPosition;
                        // Check if the ordinal position of the column was getting updated from previous add column
                        // mutations.
                        int existingOrdinalPos = ordinalPositionList.getOrdinalPositionOfColumn(columnKey);
                        if (ordinalPositionList.size() == 0) {
                            /*
                             * No ordinal positions to be updated are in the list. In that case, check whether the
                             * existing ordinal position of the column is different from its new ordinal position.
                             * If yes, then initialize the ordinal position list with this column's ordinal position
                             * as the offset.
                             */
                            existingOrdinalPos = getOrdinalPosition(view, existingViewColumn);
                            if (existingOrdinalPos != newOrdinalPosition) {
                                ordinalPositionList.setOffset(newOrdinalPosition);
                                ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
                                for (PColumn col : view.getColumns()) {
                                    int ordinalPos = getOrdinalPosition(view, col);
                                    if (ordinalPos >= newOrdinalPosition) {
                                        if (ordinalPos == existingOrdinalPos) {
                                            /*
                                             * No need to update ordinal positions of columns beyond the existing column's 
                                             * old ordinal position.
                                             */
                                            break;
                                        }
                                        // increment ordinal position of columns occurring after this column by 1
                                        int updatedPos = ordinalPos + 1;
                                        ordinalPositionList.addColumn(getColumnKey(viewKey, col), updatedPos);
                                    } 
                                }
                            } 
                        } else {
                            if (existingOrdinalPos != newOrdinalPosition) {
                                ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
                            }
                        }
                        columnsAddedToBaseTable++;
                    }
                } else {
                    // The column doesn't exist in the view.
                    Put viewColumnPut = new Put(columnKey, clientTimeStamp);
                    for (Cell cell : baseTableColumnPut.getFamilyCellMap().values().iterator().next()) {
                        viewColumnPut.add(CellUtil.createCell(columnKey, CellUtil.cloneFamily(cell),
                                CellUtil.cloneQualifier(cell), cell.getTimestamp(), cell.getTypeByte(),
                                CellUtil.cloneValue(cell)));
                    }
                    if (isDivergedView(view)) {
                        if (isPkCol) {
                            /* 
                             * Only pk cols of the base table are added to the diverged views. These pk 
                             * cols are added at the end.
                             */
                            int lastOrdinalPos = getOrdinalPosition(view, view.getColumns().get(numCols - 1));
                            int newPosition = ++lastOrdinalPos;
                            byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
                            PInteger.INSTANCE.getCodec().encodeInt(newPosition, ptr, 0);
                            viewColumnPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                                    PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
                            mutationsForAddingColumnsToViews.add(viewColumnPut);
                        } else {
                            continue; // move on to the next column
                        }
                    } else {
                        int newOrdinalPosition = p.ordinalPosition;
                        /*
                         * For a non-diverged view, we need to make sure that the base table column
                         * is added at the right position.
                         */
                        if (ordinalPositionList.size() == 0) {
                            ordinalPositionList.setOffset(newOrdinalPosition);
                            ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
                            for (PColumn col : view.getColumns()) {
                                int ordinalPos = getOrdinalPosition(view, col);
                                if (ordinalPos >= newOrdinalPosition) {
                                    // increment ordinal position of columns by 1
                                    int updatedPos = ordinalPos + 1;
                                    ordinalPositionList.addColumn(getColumnKey(viewKey, col), updatedPos);
                                } 
                            }
                        } else {
                            ordinalPositionList.addColumn(columnKey, newOrdinalPosition);
                        }
                        mutationsForAddingColumnsToViews.add(viewColumnPut);
                    }
                    if (isPkCol) {
                        deltaNumPkColsSoFar++;
                        // Set the key sequence for the pk column to be added
                        short currentKeySeq = SchemaUtil.getMaxKeySeq(view);
                        short newKeySeq = (short)(currentKeySeq + deltaNumPkColsSoFar);
                        byte[] keySeqBytes = new byte[PSmallint.INSTANCE.getByteSize()];
                        PSmallint.INSTANCE.getCodec().encodeShort(newKeySeq, keySeqBytes, 0);
                        viewColumnPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                                PhoenixDatabaseMetaData.KEY_SEQ_BYTES, keySeqBytes);
                        addMutationsForAddingPkColsToViewIndexes(mutationsForAddingColumnsToViews, clientTimeStamp, view,
                                deltaNumPkColsSoFar, columnName, viewColumnPut);
                    }
                    columnsAddedToView++;
                    columnsAddedToBaseTable++;
                }
            }
            /*
             * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base
             * table pk columns 2. if we are adding all the existing view pk columns to the base table
             */ 
            if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) {
                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
            }
            addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view,
                    deltaNumPkColsSoFar);
            
            // set table properties in child view
            if (!tablePropertyCellMap.isEmpty()) {
                Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
                for (TableProperty tableProp : TableProperty.values()) {
                    Cell tablePropertyCell = tablePropertyCellMap.get(tableProp);
                    if ( tablePropertyCell != null) {
                        // set this table property on the view :
                        // 1. if it is not mutable on a view (which means the property is always the same as the base table)
                        // 2. or if it is mutable on a view and if it doesn't exist on the view
                        // 3. or if it is mutable on a view and the property value is the same as the base table property (which means it wasn't changed on the view)
                        Object viewProp = tableProp.getPTableValue(view);
                        if (!tableProp.isMutableOnView() || viewProp==null || viewProp.equals(tableProp.getPTableValue(basePhysicalTable))) {
                            viewHeaderRowPut.add(CellUtil.createCell(viewKey, CellUtil.cloneFamily(tablePropertyCell),
                                CellUtil.cloneQualifier(tablePropertyCell), clientTimeStamp, tablePropertyCell.getTypeByte(),
                                CellUtil.cloneValue(tablePropertyCell)));
                        }
                    }
                }
                byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
                PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
                viewHeaderRowPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);
                // invalidate the view so that it is removed from the cache
                invalidateList.add(new ImmutableBytesPtr(viewKey));

                mutationsForAddingColumnsToViews.add(viewHeaderRowPut);
            }
            
            /*
             * Increment the sequence number by 1 if:
             * 1) For a diverged view, there were columns (pk columns) added to the view.
             * 2) For a non-diverged view if the base column count changed.
             */
            boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0)
                    || (!isDivergedView(view) && columnsAddedToBaseTable > 0);
            updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
                invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable,
                viewKey, view, ordinalPositionList, numCols, changeSequenceNumber);
        }
        return null;
    }

    /**
     * Updates the base table column count, column count, sequence number and ordinal postions of
     * columns of the view based on the columns being added or dropped.
     */
    private void updateViewHeaderRow(PTable basePhysicalTable, List<Mutation> tableMetadata,
            List<Mutation> mutationsForAddingColumnsToViews,
            List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, short viewColumnDelta,
            short baseTableColumnDelta, byte[] viewKey, PTable view,
            ColumnOrdinalPositionUpdateList ordinalPositionList, int numCols, boolean changeSequenceNumber) {
        // Update the view header rows with new column counts.
        Put viewHeaderRowPut = new Put(viewKey, clientTimeStamp);
        if (!isDivergedView(view) && baseTableColumnDelta != 0) {
            // Base column count should only be updated for diverged views.
            int oldBaseColumnCount = view.getBaseColumnCount();
            byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
            PInteger.INSTANCE.getCodec().encodeInt(oldBaseColumnCount + baseTableColumnDelta, baseColumnCountPtr, 0);
            viewHeaderRowPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp, baseColumnCountPtr);
        }
        
        if (viewColumnDelta != 0) {
            byte[] columnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
            PInteger.INSTANCE.getCodec().encodeInt(numCols + viewColumnDelta, columnCountPtr, 0);
            viewHeaderRowPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, clientTimeStamp, columnCountPtr);
        }
        
        if (changeSequenceNumber) {
            byte[] viewSequencePtr = new byte[PLong.INSTANCE.getByteSize()];
            PLong.INSTANCE.getCodec().encodeLong(view.getSequenceNumber() + 1, viewSequencePtr, 0);
            viewHeaderRowPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, clientTimeStamp, viewSequencePtr);

            mutationsForAddingColumnsToViews.add(viewHeaderRowPut);

            // only invalidate if the sequence number is about to change
            invalidateList.add(new ImmutableBytesPtr(viewKey));

            // Update the ordinal positions. The list would be non-empty only if the sequence
            // number will change.
            int i = 0;
            for (byte[] columnKey : ordinalPositionList.columnKeys) {
                int ordinalPosition = ordinalPositionList.getOrdinalPositionFromListIdx(i);
                Put positionUpdatePut = new Put(columnKey, clientTimeStamp);
                byte[] ptr = new byte[PInteger.INSTANCE.getByteSize()];
                PInteger.INSTANCE.getCodec().encodeInt(ordinalPosition, ptr, 0);
                positionUpdatePut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, clientTimeStamp, ptr);
                mutationsForAddingColumnsToViews.add(positionUpdatePut);
                i++;
            }
        }
        
        if (view.rowKeyOrderOptimizable()) {
            UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, viewKey, clientTimeStamp);
        }
        
        // if switching from from non tx to tx
        if (!basePhysicalTable.isTransactional() && switchAttribute(basePhysicalTable, basePhysicalTable.isTransactional(), tableMetadata, TRANSACTIONAL_BYTES)) {
            invalidateList.add(new ImmutableBytesPtr(viewKey));
            Put put = new Put(viewKey);
            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    TRANSACTIONAL_BYTES, clientTimeStamp, PBoolean.INSTANCE.toBytes(true));
            mutationsForAddingColumnsToViews.add(put);
        }
    }
    
    private class ColumnFinder extends StatelessTraverseAllExpressionVisitor<Void> {
        private boolean columnFound;
        private final Expression columnExpression;

        public ColumnFinder(Expression columnExpression) {
            this.columnExpression = columnExpression;
            columnFound = false;
        }

        private Void process(Expression expression) {
            if (expression.equals(columnExpression)) {
                columnFound = true;
            }
            return null;
        }

        @Override
        public Void visit(KeyValueColumnExpression expression) {
            return process(expression);
        }

        @Override
        public Void visit(RowKeyColumnExpression expression) {
            return process(expression);
        }

        @Override
        public Void visit(ProjectedColumnExpression expression) {
            return process(expression);
        }

        public boolean getColumnFound() {
            return columnFound;
        }
    }

    private MetaDataMutationResult dropColumnsFromChildViews(Region region,
            PTable basePhysicalTable, List<RowLock> locks, List<Mutation> tableMetadata,
            List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
            List<ImmutableBytesPtr> invalidateList, long clientTimeStamp,
            TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion)
            throws IOException, SQLException {
        List<Delete> columnDeletesForBaseTable = new ArrayList<>(tableMetadata.size());
        // Isolate the deletes relevant to dropping columns. Also figure out what kind of columns
        // are being added.
        for (Mutation m : tableMetadata) {
            if (m instanceof Delete) {
                byte[][] rkmd = new byte[5][];
                int pkCount = getVarChars(m.getRow(), rkmd);
                if (pkCount > COLUMN_NAME_INDEX
                        && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
                        && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
                    columnDeletesForBaseTable.add((Delete) m);
                }
            }
        }
        for (ViewInfo viewInfo : childViewsResult.getViewInfoList()) {
            short numColsDeleted = 0;
            byte[] viewTenantId = viewInfo.getTenantId();
            byte[] viewSchemaName = viewInfo.getSchemaName();
            byte[] viewName = viewInfo.getViewName();
            byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);

            // lock the rows corresponding to views so that no other thread can modify the view
            // meta-data
            RowLock viewRowLock = acquireLock(region, viewKey, locks);
            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);

            ColumnOrdinalPositionUpdateList ordinalPositionList =
                    new ColumnOrdinalPositionUpdateList();
            int numCols = view.getColumns().size();
            int minDroppedColOrdinalPos = Integer.MAX_VALUE;
            for (Delete columnDeleteForBaseTable : columnDeletesForBaseTable) {
                PColumn existingViewColumn = null;
                byte[][] rkmd = new byte[5][];
                getVarChars(columnDeleteForBaseTable.getRow(), rkmd);
                String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
                String columnFamily =
                        rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes
                                .toString(rkmd[FAMILY_NAME_INDEX]);
                byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
                try {
                    existingViewColumn =
                            columnFamily == null ? view.getColumnForColumnName(columnName) : view
                                    .getColumnFamily(columnFamily).getPColumnForColumnName(columnName);
                } catch (ColumnFamilyNotFoundException e) {
                    // ignore since it means that the column family is not present for the column to
                    // be added.
                } catch (ColumnNotFoundException e) {
                    // ignore since it means the column is not present in the view
                }

                // check if the view where expression contains the column being dropped and prevent
                // it
                if (existingViewColumn != null && view.getViewStatement() != null) {
                    ParseNode viewWhere =
                            new SQLParser(view.getViewStatement()).parseQuery().getWhere();
                    PhoenixConnection conn=null;
                    try {
                        conn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(
                            PhoenixConnection.class);
                    } catch (ClassNotFoundException e) {
                    }
                    PhoenixStatement statement = new PhoenixStatement(conn);
                    TableRef baseTableRef = new TableRef(basePhysicalTable);
                    ColumnResolver columnResolver = FromCompiler.getResolver(baseTableRef);
                    StatementContext context = new StatementContext(statement, columnResolver);
                    Expression whereExpression = WhereCompiler.compile(context, viewWhere);
                    Expression colExpression =
                            new ColumnRef(baseTableRef, existingViewColumn.getPosition())
                                    .newColumnExpression();
                    ColumnFinder columnFinder = new ColumnFinder(colExpression);
                    whereExpression.accept(columnFinder);
                    if (columnFinder.getColumnFound()) {
                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
                                EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
                    }
                }

                minDroppedColOrdinalPos =
                        Math.min(getOrdinalPosition(view, existingViewColumn),
                            minDroppedColOrdinalPos);
                if (existingViewColumn != null) {
                    --numColsDeleted;
                    if (ordinalPositionList.size() == 0) {
                        ordinalPositionList.setOffset(view.getBucketNum() == null ? 1 : 0);
                        for (PColumn col : view.getColumns()) {
                            ordinalPositionList.addColumn(getColumnKey(viewKey, col));
                        }
                    }
                    ordinalPositionList.dropColumn(columnKey);
                    Delete viewColumnDelete = new Delete(columnKey, clientTimeStamp);
                    mutationsForAddingColumnsToViews.add(viewColumnDelete);
                    // drop any view indexes that need this column
                    dropIndexes(view, region, invalidateList, locks, clientTimeStamp,
                        schemaName, view.getName().getBytes(),
                        mutationsForAddingColumnsToViews, existingViewColumn,
                        tableNamesToDelete, sharedTablesToDelete, clientVersion);
                }
            }

            updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews,
                invalidateList, clientTimeStamp, numColsDeleted, numColsDeleted, viewKey, view,
                ordinalPositionList, numCols, true);
        }
        return null;
    }
    
    private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) {
        if (existingViewColumn != null) {
            if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) {
                /*
                 * If the column already exists in a view, then we cannot add the column to the base
                 * table. The reason is subtle and is as follows: consider the case where a table
                 * has two views where both the views have the same key value column KV. Now, we
                 * dole out encoded column qualifiers for key value columns in views by using the
                 * counters stored in the base physical table. So the KV column can have different
                 * column qualifiers for the two views. For example, 11 for VIEW1 and 12 for VIEW2.
                 * This naturally extends to rows being inserted using the two views having
                 * different column qualifiers for the column named KV. Now, when an attempt is made
                 * to add column KV to the base table, we cannot decide which column qualifier
                 * should that column be assigned. It cannot be a number different than 11 or 12
                 * since a query like SELECT KV FROM BASETABLE would return null for KV which is
                 * incorrect since column KV is present in rows inserted from the two views. We
                 * cannot use 11 or 12 either because we will then incorrectly return value of KV
                 * column inserted using only one view.
                 */
                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
            }
            // Validate data type is same
            int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
            if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) {
                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
            }

            // Validate max length is same
            int maxLength = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
            int existingMaxLength = existingViewColumn.getMaxLength() == null ? 0 : existingViewColumn.getMaxLength();
            if (maxLength != existingMaxLength) {
                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); 
            }
            
            // Validate scale is same
            int scale = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
            int existingScale = existingViewColumn.getScale() == null ? 0 : existingViewColumn.getScale();
            if (scale != existingScale) {
                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); 
            }
            
            // Validate sort order is same
            int sortOrder = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, SORT_ORDER_BYTES);
            if (sortOrder != existingViewColumn.getSortOrder().getSystemValue()) {
                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
            }
            
            // if the column to be added to the base table is a pk column, then we need to validate that the key slot position is the same
            if (isColumnToBeAddPkCol) {
                List<Cell> keySeqCells = columnToBeAdded.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.KEY_SEQ_BYTES);
                if (keySeqCells != null && keySeqCells.size() > 0) {
                    Cell cell = keySeqCells.get(0);
                    int keySeq = PSmallint.INSTANCE.getCodec().decodeInt(cell.getValueArray(), cell.getValueOffset(),
                            SortOrder.getDefault());
                    int pkPosition = SchemaUtil.getPKPosition(view, existingViewColumn) + 1;
                    if (pkPosition != keySeq) { return new MetaDataMutationResult(
                            MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); }
                }
            }
        }
        return null;
    }
    
    private int getInteger(Put p, byte[] family, byte[] qualifier) {
        List<Cell> cells = p.get(family, qualifier);
        if (cells != null && cells.size() > 0) {
            Cell cell = cells.get(0);
            return (Integer)PInteger.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
        }
        return 0;
    }
    
    private void addViewIndexesHeaderRowMutations(List<Mutation> mutationsForAddingColumnsToViews,
            List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, PTable view, short deltaNumPkColsSoFar) {
        if (deltaNumPkColsSoFar > 0) {
            for (PTable index : view.getIndexes()) {
                byte[] indexHeaderRowKey = getViewIndexHeaderRowKey(index);
                Put indexHeaderRowMutation = new Put(indexHeaderRowKey);
                
                // increment sequence number
                long newSequenceNumber = index.getSequenceNumber() + 1;
                byte[] newSequenceNumberPtr = new byte[PLong.INSTANCE.getByteSize()];
                PLong.INSTANCE.getCodec().encodeLong(newSequenceNumber, newSequenceNumberPtr, 0);
                indexHeaderRowMutation.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, newSequenceNumberPtr);
                
                // increase the column count
                int newColumnCount = index.getColumns().size() + deltaNumPkColsSoFar;
                byte[] newColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
                PInteger.INSTANCE.getCodec().encodeInt(newColumnCount, newColumnCountPtr, 0);
                indexHeaderRowMutation.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES, newColumnCountPtr);
                
                // add index row header key to the invalidate list to force clients to fetch the latest meta-data
                invalidateList.add(new ImmutableBytesPtr(indexHeaderRowKey));
                if (index.rowKeyOrderOptimizable()) {
                    UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, indexHeaderRowKey, clientTimeStamp);
                }
                mutationsForAddingColumnsToViews.add(indexHeaderRowMutation);
            }
        }
    }

    private void addMutationsForAddingPkColsToViewIndexes(List<Mutation> mutationsForAddingColumnsToViews, long clientTimeStamp,
            PTable view, short deltaNumPkColsSoFar, String viewPkColumnName, Put viewColumnDefinitionPut) {
        for (PTable index : view.getIndexes()) {
            int oldNumberOfColsInIndex = index.getColumns().size();
            
            byte[] indexColumnKey = ByteUtil.concat(getViewIndexHeaderRowKey(index), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(IndexUtil.getIndexColumnName(null, viewPkColumnName)));
            Put indexColumnDefinitionPut = new Put(indexColumnKey, clientTimeStamp);
            
            // Set the index specific data type for the column
            int viewPkColumnDataType = getInteger(viewColumnDefinitionPut, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
            byte[] indexColumnDataTypeBytes = new byte[PInteger.INSTANCE.getByteSize()];
            int indexColumnDataType = IndexUtil.getIndexColumnDataType(true,
                    PDataType.fromTypeId(viewPkColumnDataType)).getSqlType();
            PInteger.INSTANCE.getCodec().encodeInt(indexColumnDataType, indexColumnDataTypeBytes, 0);
            indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.DATA_TYPE_BYTES, indexColumnDataTypeBytes);
            
             
            // Set precision
            List<Cell> decimalDigits = viewColumnDefinitionPut.get(
                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES);
            if (decimalDigits != null && decimalDigits.size() > 0) {
                Cell decimalDigit = decimalDigits.get(0);
                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES, decimalDigit.getValueArray());
            }
            
            // Set size
            List<Cell> columnSizes = viewColumnDefinitionPut.get(
                    PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES);
            if (columnSizes != null && columnSizes.size() > 0) {
                Cell columnSize = columnSizes.get(0);
                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES, columnSize.getValueArray());
            }
            
            // Set sort order
            List<Cell> sortOrders = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.SORT_ORDER_BYTES);
            if (sortOrders != null && sortOrders.size() > 0) {
                Cell sortOrder = sortOrders.get(0);
                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.SORT_ORDER_BYTES, sortOrder.getValueArray());
            }
            
            // Set data table name
            List<Cell> dataTableNames = viewColumnDefinitionPut.get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
            if (dataTableNames != null && dataTableNames.size() > 0) {
                Cell dataTableName = dataTableNames.get(0);
                indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES, dataTableName.getValueArray());
            }
            
            // Set the ordinal position of the new column.
            byte[] ordinalPositionBytes = new byte[PInteger.INSTANCE.getByteSize()];
            int ordinalPositionOfNewCol = oldNumberOfColsInIndex + deltaNumPkColsSoFar;
            PInteger.INSTANCE.getCodec().encodeInt(ordinalPositionOfNewCol, ordinalPositionBytes, 0);
            indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES, ordinalPositionBytes);
            
            // New PK columns have to be nullable after the first DDL
            byte[] isNullableBytes = PInteger.INSTANCE.toBytes(ResultSetMetaData.columnNullable);
            indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                        PhoenixDatabaseMetaData.NULLABLE_BYTES, isNullableBytes);
            
            // Set the key sequence for the pk column to be added
            short currentKeySeq = SchemaUtil.getMaxKeySeq(index);
            short newKeySeq = (short)(currentKeySeq + deltaNumPkColsSoFar);
            byte[] keySeqBytes = new byte[PSmallint.INSTANCE.getByteSize()];
            PSmallint.INSTANCE.getCodec().encodeShort(newKeySeq, keySeqBytes, 0);
            indexColumnDefinitionPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                    PhoenixDatabaseMetaData.KEY_SEQ_BYTES, keySeqBytes);
            
            mutationsForAddingColumnsToViews.add(indexColumnDefinitionPut);
        }
    }
    
    private byte[] getViewIndexHeaderRowKey(PTable index) {
        byte[] tenantIdBytes = index.getKey().getTenantId() != null ? index.getKey().getTenantId().getBytes() : EMPTY_BYTE_ARRAY;
        byte[] schemaNameBytes = index.getSchemaName() != null ? index.getSchemaName().getBytes() : EMPTY_BYTE_ARRAY; 
        byte[] tableNameBytes = index.getTableName().getBytes();
        return ByteUtil.concat(tenantIdBytes, SEPARATOR_BYTE_ARRAY, schemaNameBytes, SEPARATOR_BYTE_ARRAY, tableNameBytes);
    }
  
    
    /**
     * Determines whether or not we have a change that needs to be propagated from a base table
     * to it's views. For example, a change to GUIDE_POSTS_WIDTH does not need to be propogated
     * since it's only set on the physical table.
     * @param table the table being altered
     * @param rowKeyMetaData the filled in values for schemaName and tableName
     * @param tableMetaData the metadata passed over from the client
     * @return true if changes need to be propagated to the views and false otherwise.
     */
    private static boolean hasChangesToPropagate(PTable table, byte[][] rowKeyMetaData, List<Mutation> tableMetaData) {
        boolean hasChangesToPropagate = true;
        byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
        byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
        for (Mutation m : tableMetaData) {
            byte[] key = m.getRow();
            int pkCount = getVarChars(key, rowKeyMetaData);
            if (pkCount >= COLUMN_NAME_INDEX
                    && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
                    && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
                return true;
            } else {
                Collection<List<Cell>>cellLists = m.getFamilyCellMap().values();
                for (List<Cell> cells : cellLists) {
                    if (cells != null) {
                        for (Cell cell : cells) {
                            byte[] qualifier = CellUtil.cloneQualifier(cell);
                            String columnName = Bytes.toString(qualifier);
                            try {
                                // Often Phoenix table properties aren't valid to be set on a view so thus
                                // do not need to be propogated. Here we check if the column name corresponds
                                // to a table property and whether that property is valid to set on a view.
                                TableProperty tableProp = TableProperty.valueOf(columnName);
                                if (tableProp.propagateToViews()) {
                                    return true;
                                } else {
                                    hasChangesToPropagate = false;
                                }
                            } catch (IllegalArgumentException  e) {
                            }
                        }
                    }
                }
            }
        }
        return hasChangesToPropagate;
    }


    @Override
    public void addColumn(RpcController controller, final AddColumnRequest request,
            RpcCallback<MetaDataResponse> done) {
        try {
            List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
            MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
                @Override
                public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
                        List<Mutation> tableMetaData, Region region, List<ImmutableBytesPtr> invalidateList,
                        List<RowLock> locks, long clientTimeStamp) throws IOException, SQLException {
                    byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
                    byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                    byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
                    PTableType type = table.getType();
                    byte[] tableHeaderRowKey = SchemaUtil.getTableKey(tenantId,
                            schemaName, tableName);
                    byte[] cPhysicalTableName=table.getPhysicalName().getBytes();
                    getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
                            SchemaUtil.getTableName(schemaName, tableName), TableName.valueOf(cPhysicalTableName),
                            getParentPhysicalTableName(table),type);

                    // Size for worst case - all new columns are PK column
                    List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size()));
                    if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
                        // If change doesn't need to be propagated, don't bother finding children
                        if (hasChangesToPropagate(table, rowKeyMetaData, tableMetaData)) {
                            TableViewFinder childViewsResult = new TableViewFinder();
                            findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
                            if (childViewsResult.hasViews()) {
                                /* 
                                 * Dis-allow if:
                                 * 1) The meta-data for child view/s spans over
                                 * more than one region (since the changes cannot be made in a transactional fashion)
                                 * 
                                 * 2) The base column count is 0 which means that the metadata hasn't been upgraded yet or
                                 * the upgrade is currently in progress.
                                 * 
                                 * 3) If the request is from a client that is older than 4.5 version of phoenix. 
                                 * Starting from 4.5, metadata requests have the client version included in them. 
                                 * We don't want to allow clients before 4.5 to add a column to the base table if it has views.
                                 * 
                                 * 4) Trying to swtich tenancy of a table that has views
                                 */
                                if (!childViewsResult.allViewsInSingleRegion() 
                                        || table.getBaseColumnCount() == 0 
                                        || !request.hasClientVersion()
                                        || switchAttribute(table, table.isMultiTenant(), tableMetaData, MULTI_TENANT_BYTES)) {
                                    return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
                                            EnvironmentEdgeManager.currentTimeMillis(), null);
                                } else {
                                    mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getViewInfoList().size() * tableMetaData.size());
                                    MetaDataMutationResult mutationResult = addColumnsAndTablePropertiesToChildViews(table, tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp,
                                            childViewsResult, region, locks, request.getClientVersion());
                                    // return if we were not able to add the column successfully
                                    if (mutationResult!=null)
                                        return mutationResult;
                                } 
                            }
                        }
                    } else if (type == PTableType.VIEW
                            && EncodedColumnsUtil.usesEncodedColumnNames(table)) {
                        /*
                         * When adding a column to a view that uses encoded column name scheme, we
                         * need to modify the CQ counters stored in the view's physical table. So to
                         * make sure clients get the latest PTable, we need to invalidate the cache
                         * entry.
                         */
                        invalidateList.add(new ImmutableBytesPtr(MetaDataUtil
                                .getPhysicalTableRowForView(table)));
                    }
                    for (Mutation m : tableMetaData) {
                        byte[] key = m.getRow();
                        boolean addingPKColumn = false;
                        int pkCount = getVarChars(key, rowKeyMetaData);
                        if (pkCount > COLUMN_NAME_INDEX
                                && Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
                                && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
                            try {
                                if (pkCount > FAMILY_NAME_INDEX
                                        && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
                                    PColumnFamily family =
                                            table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
                                    family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                } else if (pkCount > COLUMN_NAME_INDEX
                                        && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                    addingPKColumn = true;
                                    table.getPKColumn(new String(
                                            rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
                                } else {
                                    continue;
                                }
                                return new MetaDataMutationResult(
                                        MutationCode.COLUMN_ALREADY_EXISTS, EnvironmentEdgeManager
                                        .currentTimeMillis(), table);
                            } catch (ColumnFamilyNotFoundException e) {
                                continue;
                            } catch (ColumnNotFoundException e) {
                                if (addingPKColumn) {
                                    // We may be adding a DESC column, so if table is already
                                    // able to be rowKeyOptimized, it should continue to be so.
                                    if (table.rowKeyOrderOptimizable()) {
                                        UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, tableHeaderRowKey, clientTimeStamp);
                                    } else if (table.getType() == PTableType.VIEW){
                                        // Don't allow view PK to diverge from table PK as our upgrade code
                                        // does not handle this.
                                        return new MetaDataMutationResult(
                                                MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager
                                                .currentTimeMillis(), null);
                                    }
                                    // Add all indexes to invalidate list, as they will all be
                                    // adding the same PK column. No need to lock them, as we
                                    // have the parent table lock at this point.
                                    for (PTable index : table.getIndexes()) {
                                        invalidateList.add(new ImmutableBytesPtr(SchemaUtil
                                                .getTableKey(tenantId, index.getSchemaName()
                                                        .getBytes(), index.getTableName()
                                                        .getBytes())));
                                        // We may be adding a DESC column, so if index is already
                                        // able to be rowKeyOptimized, it should continue to be so.
                                        if (index.rowKeyOrderOptimizable()) {
                                            byte[] indexHeaderRowKey = SchemaUtil.getTableKey(index.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes(),
                                                    index.getSchemaName().getBytes(), index.getTableName().getBytes());
                                            UpgradeUtil.addRowKeyOrderOptimizableCell(mutationsForAddingColumnsToViews, indexHeaderRowKey, clientTimeStamp);
                                        }
                                    }
                                }
                                continue;
                            }
                        } else if (pkCount == COLUMN_NAME_INDEX &&
                                   ! (Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0 &&
                                      Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) ) {
                            // Invalidate any table with mutations
                            // TODO: this likely means we don't need the above logic that
                            // loops through the indexes if adding a PK column, since we'd
                            // always have header rows for those.
                            invalidateList.add(new ImmutableBytesPtr(SchemaUtil
                                    .getTableKey(tenantId, 
                                            rowKeyMetaData[SCHEMA_NAME_INDEX],
                                            rowKeyMetaData[TABLE_NAME_INDEX])));
                        }
                    }
                    tableMetaData.addAll(mutationsForAddingColumnsToViews);
                    return null;
                }
            }, request.getClientVersion());
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
            }
        } catch (Throwable e) {
            logger.error("Add column failed: ", e);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException("Error when adding column: ", e));
        }
    }

    private PTable doGetTable(byte[] key, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
        return doGetTable(key, clientTimeStamp, null, clientVersion);
    }

    private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock, int clientVersion) throws IOException, SQLException {
        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                GlobalCache.getInstance(this.env).getMetaDataCache();
        // Ask Lars about the expense of this call - if we don't take the lock, we still won't get
        // partial results
        // get the co-processor environment
        // TODO: check that key is within region.getStartKey() and region.getEndKey()
        // and return special code to force client to lookup region from meta.
        Region region = env.getRegion();
        /*
         * Lock directly on key, though it may be an index table. This will just prevent a table
         * from getting rebuilt too often.
         */
        final boolean wasLocked = (rowLock != null);
        if (!wasLocked) {
            rowLock = acquireLock(region, key, null);
        }
        try {
            PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
            // We only cache the latest, so we'll end up building the table with every call if the
            // client connection has specified an SCN.
            // TODO: If we indicate to the client that we're returning an older version, but there's a
            // newer version available, the client
            // can safely not call this, since we only allow modifications to the latest.
            if (table != null && table.getTimeStamp() < clientTimeStamp) {
                // Table on client is up-to-date with table on server, so just return
                if (isTableDeleted(table)) {
                    return null;
                }
                return table;
            }
            // Try cache again in case we were waiting on a lock
            table = (PTable)metaDataCache.getIfPresent(cacheKey);
            // We only cache the latest, so we'll end up building the table with every call if the
            // client connection has specified an SCN.
            // TODO: If we indicate to the client that we're returning an older version, but there's
            // a newer version available, the client
            // can safely not call this, since we only allow modifications to the latest.
            if (table != null && table.getTimeStamp() < clientTimeStamp) {
                // Table on client is up-to-date with table on server, so just return
                if (isTableDeleted(table)) {
                    return null;
                }
                return table;
            }
            // Query for the latest table first, since it's not cached
            table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
            if ((table != null && table.getTimeStamp() < clientTimeStamp) || 
                    (blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) {
                return table;
            }
            // Otherwise, query for an older version of the table - it won't be cached
            return buildTable(key, cacheKey, region, clientTimeStamp, clientVersion);
        } finally {
            if (!wasLocked) rowLock.release();
        }
    }

    private List<PFunction> doGetFunctions(List<byte[]> keys, long clientTimeStamp) throws IOException, SQLException {
        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                GlobalCache.getInstance(this.env).getMetaDataCache();
        Region region = env.getRegion();
        Collections.sort(keys, new Comparator<byte[]>() {
            @Override
            public int compare(byte[] o1, byte[] o2) {
                return Bytes.compareTo(o1, o2);
            }
        });
        /*
         * Lock directly on key, though it may be an index table. This will just prevent a table
         * from getting rebuilt too often.
         */
        List<RowLock> rowLocks = new ArrayList<RowLock>(keys.size());;
        try {
            for (int i = 0; i < keys.size(); i++) {
                acquireLock(region, keys.get(i), rowLocks);
            }

            List<PFunction> functionsAvailable = new ArrayList<PFunction>(keys.size());
            int numFunctions = keys.size(); 
            Iterator<byte[]> iterator = keys.iterator();
            while(iterator.hasNext()) {
                byte[] key = iterator.next();
                PFunction function = (PFunction)metaDataCache.getIfPresent(new FunctionBytesPtr(key));
                if (function != null && function.getTimeStamp() < clientTimeStamp) {
                    if (isFunctionDeleted(function)) {
                        return null;
                    }
                    functionsAvailable.add(function);
                    iterator.remove();
                }
            }
            if(functionsAvailable.size() == numFunctions) return functionsAvailable;

            // Query for the latest table first, since it's not cached
            List<PFunction> buildFunctions =
                    buildFunctions(keys, region, clientTimeStamp, false,
                        Collections.<Mutation> emptyList());
            if(buildFunctions == null || buildFunctions.isEmpty()) {
                return null;
            }
            functionsAvailable.addAll(buildFunctions);
            if(functionsAvailable.size() == numFunctions) return functionsAvailable;
            return null;
        } finally {
            releaseRowLocks(region,rowLocks);
        }
    }

    @Override
    public void dropColumn(RpcController controller, final DropColumnRequest request,
            RpcCallback<MetaDataResponse> done) {
        List<Mutation> tableMetaData = null;
        final List<byte[]> tableNamesToDelete = Lists.newArrayList();
        final List<SharedTableState> sharedTablesToDelete = Lists.newArrayList();
        try {
            tableMetaData = ProtobufUtil.getMutations(request);
            MetaDataMutationResult result = mutateColumn(tableMetaData, new ColumnMutator() {
                @Override
                public MetaDataMutationResult updateMutation(PTable table, byte[][] rowKeyMetaData,
                        List<Mutation> tableMetaData, Region region,
                        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks, long clientTimeStamp)
                        throws IOException, SQLException {
                    byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
                    byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
                    byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
                    boolean deletePKColumn = false;
                    getCoprocessorHost().preAlterTable(Bytes.toString(tenantId),
                            SchemaUtil.getTableName(schemaName, tableName),
                            TableName.valueOf(table.getPhysicalName().getBytes()),
                            getParentPhysicalTableName(table),table.getType());

                    List<Mutation> additionalTableMetaData = Lists.newArrayList();
                    
                    PTableType type = table.getType();
                    if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
                        TableViewFinder childViewsResult = new TableViewFinder();
                        findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
                        if (childViewsResult.hasViews()) {
                            MetaDataMutationResult mutationResult =
                                    dropColumnsFromChildViews(region, table,
                                        locks, tableMetaData, additionalTableMetaData,
                                        schemaName, tableName, invalidateList,
                                        clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
                            // return if we were not able to drop the column successfully
                            if (mutationResult != null) return mutationResult;
                        }
                    }
                    
                    for (Mutation m : tableMetaData) {
                        if (m instanceof Delete) {
                            byte[] key = m.getRow();
                            int pkCount = getVarChars(key, rowKeyMetaData);
                            if (pkCount > COLUMN_NAME_INDEX
                                    && Bytes.compareTo(schemaName,
                                        rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
                                    && Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
                                PColumn columnToDelete = null;
                                try {
                                    if (pkCount > FAMILY_NAME_INDEX
                                            && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
                                        PColumnFamily family =
                                                table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
                                        columnToDelete =
                                                family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                    } else if (pkCount > COLUMN_NAME_INDEX
                                            && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                        deletePKColumn = true;
                                        columnToDelete = table.getPKColumn(new String(
                                          rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
                                    } else {
                                        continue;
                                    }
                                    if (table.getType() == PTableType.VIEW) {
                                        if (table.getBaseColumnCount() != DIVERGED_VIEW_BASE_COLUMN_COUNT
                                                && columnToDelete.getPosition() < table.getBaseColumnCount()) {
                                            /*
                                             * If the column being dropped is inherited from the base table, then the
                                             * view is about to diverge itself from the base table. The consequence of
                                             * this divergence is that that any further meta-data changes made to the
                                             * base table will not be propagated to the hierarchy of views where this
                                             * view is the root.
                                             */
                                            byte[] viewKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
                                            Put updateBaseColumnCountPut = new Put(viewKey);
                                            byte[] baseColumnCountPtr = new byte[PInteger.INSTANCE.getByteSize()];
                                            PInteger.INSTANCE.getCodec().encodeInt(DIVERGED_VIEW_BASE_COLUMN_COUNT,
                                                    baseColumnCountPtr, 0);
                                            updateBaseColumnCountPut.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                                                    PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, clientTimeStamp,
                                                    baseColumnCountPtr);
                                            additionalTableMetaData.add(updateBaseColumnCountPut);
                                        }
                                    }
                                    if (columnToDelete.isViewReferenced()) { // Disallow deletion of column referenced in WHERE clause of view
                                        return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
                                    }
                                    // drop any indexes that need the column that is going to be dropped
                                    dropIndexes(table, region, invalidateList, locks,
                                        clientTimeStamp, schemaName, tableName,
                                        additionalTableMetaData, columnToDelete,
                                        tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
                                } catch (ColumnFamilyNotFoundException e) {
                                    return new MetaDataMutationResult(
                                            MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
                                                    .currentTimeMillis(), table, columnToDelete);
                                } catch (ColumnNotFoundException e) {
                                    return new MetaDataMutationResult(
                                            MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
                                                    .currentTimeMillis(), table, columnToDelete);
                                } 
                            }
                        }
                    }
                    if (deletePKColumn) {
                        if (table.getPKColumns().size() == 1) {
                            return new MetaDataMutationResult(MutationCode.NO_PK_COLUMNS,
                                    EnvironmentEdgeManager.currentTimeMillis(), null);
                        }
                    }
                    tableMetaData.addAll(additionalTableMetaData);
                    long currentTime = MetaDataUtil.getClientTimeStamp(tableMetaData);
                    return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null, tableNamesToDelete, sharedTablesToDelete);
                }
            }, request.getClientVersion());
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
            }
        } catch (Throwable e) {
            logger.error("Drop column failed: ", e);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException("Error when dropping column: ", e));
        }
    }
    
    private void dropIndexes(PTable table, Region region, List<ImmutableBytesPtr> invalidateList,
            List<RowLock> locks, long clientTimeStamp, byte[] schemaName,
            byte[] tableName, List<Mutation> additionalTableMetaData, PColumn columnToDelete, 
            List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion)
            throws IOException, SQLException {
        // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the
        // index and then invalidate it
        // Covered columns are deleted from the index by the client
        PhoenixConnection connection = null;
        try {
            connection = table.getIndexes().isEmpty() ? null : QueryUtil.getConnectionOnServer(
                env.getConfiguration()).unwrap(PhoenixConnection.class);
        } catch (ClassNotFoundException e) {
        }
        for (PTable index : table.getIndexes()) {
            byte[] tenantId = index.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes(); 
            IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection);
            byte[] indexKey =
                    SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index
                            .getTableName().getBytes());
            Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString());
            ColumnReference colDropRef = new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getColumnQualifierBytes());
            boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo);
            boolean isCoveredColumn = indexMaintainer.getCoveredColumns().contains(colDropRef);
            // If index requires this column for its pk, then drop it
            if (isColumnIndexed) {
                // Since we're dropping the index, lock it to ensure
                // that a change in index state doesn't
                // occur while we're dropping it.
                acquireLock(region, indexKey, locks);
                // Drop the index table. The doDropTable will expand
                // this to all of the table rows and invalidate the
                // index table
                additionalTableMetaData.add(new Delete(indexKey, clientTimeStamp));
                byte[] linkKey =
                        MetaDataUtil.getParentLinkKey(tenantId, schemaName, tableName, index
                                .getTableName().getBytes());
                // Drop the link between the data table and the
                // index table
                additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp));
                doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index
                        .getTableName().getBytes(), tableName, index.getType(),
                    additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
                invalidateList.add(new ImmutableBytesPtr(indexKey));
            }
            // If the dropped column is a covered index column, invalidate the index
            else if (isCoveredColumn){
                invalidateList.add(new ImmutableBytesPtr(indexKey));
            }
        }
    }

    @Override
    public void clearCache(RpcController controller, ClearCacheRequest request,
            RpcCallback<ClearCacheResponse> done) {
        GlobalCache cache = GlobalCache.getInstance(this.env);
        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                GlobalCache.getInstance(this.env).getMetaDataCache();
        metaDataCache.invalidateAll();
        long unfreedBytes = cache.clearTenantCache();
        ClearCacheResponse.Builder builder = ClearCacheResponse.newBuilder();
        builder.setUnfreedBytes(unfreedBytes);
        done.run(builder.build());
    }

    @Override
    public void getVersion(RpcController controller, GetVersionRequest request, RpcCallback<GetVersionResponse> done) {

        GetVersionResponse.Builder builder = GetVersionResponse.newBuilder();
        Configuration config = env.getConfiguration();
        if (isTablesMappingEnabled
                && MetaDataProtocol.MIN_NAMESPACE_MAPPED_PHOENIX_VERSION > request.getClientVersion()) {
            logger.error("Old client is not compatible when" + " system tables are upgraded to map to namespace");
            ProtobufUtil.setControllerException(controller,
                    ServerUtil.createIOException(
                            SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
                                    isTablesMappingEnabled).toString(),
                    new DoNotRetryIOException(
                            "Old client is not compatible when" + " system tables are upgraded to map to namespace")));
        }
        long version = MetaDataUtil.encodeVersion(env.getHBaseVersion(), config);

        PTable systemCatalog = null;
        byte[] tableKey =
          SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
            PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA_BYTES,
            PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES);
        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
        try {
            systemCatalog = loadTable(env, tableKey, cacheKey, MIN_SYSTEM_TABLE_TIMESTAMP,
              HConstants.LATEST_TIMESTAMP, request.getClientVersion());
        } catch (Throwable t) {
            logger.error("loading system catalog table inside getVersion failed", t);
            ProtobufUtil.setControllerException(controller,
              ServerUtil.createIOException(
                SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
                  isTablesMappingEnabled).toString(), t));
        }
        // In case this is the first connection, system catalog does not exist, and so we don't
        // set the optional system catalog timestamp.
        if (systemCatalog != null) {
            builder.setSystemCatalogTimestamp(systemCatalog.getTimeStamp());
        }
        builder.setVersion(version);
        done.run(builder.build());
    }

    @Override
    public void updateIndexState(RpcController controller, UpdateIndexStateRequest request,
            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        byte[] schemaName = null;
        byte[] tableName = null;
        try {
            byte[][] rowKeyMetaData = new byte[3][];
            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
            byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
            schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
            tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
            final byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
            Region region = env.getRegion();
            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
                return;
            }
            long timeStamp = HConstants.LATEST_TIMESTAMP;
            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
            List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
            Cell newKV = null;
            int disableTimeStampKVIndex = -1;
            int indexStateKVIndex = 0;
            int index = 0;
            for(Cell cell : newKVs) {
                if(Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
                      INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
                  newKV = cell;
                  indexStateKVIndex = index;
                  timeStamp = cell.getTimestamp();
                } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
                  INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0) {
                  disableTimeStampKVIndex = index;
                }
                index++;
            }
            PIndexState newState =
                    PIndexState.fromSerializedValue(newKV.getValueArray()[newKV.getValueOffset()]);
            RowLock rowLock = acquireLock(region, key, null);
            if (rowLock == null) {
                throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key));
            }
            try {
                Get get = new Get(key);
                get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
                get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
                get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
                get.addColumn(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
                Result currentResult = region.get(get);
                if (currentResult.rawCells().length == 0) {
                    builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                    builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                    done.run(builder.build());
                    return;
                }
                Cell dataTableKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
                Cell currentStateKV = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
                Cell currentDisableTimeStamp = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
                boolean rowKeyOrderOptimizable = currentResult.getColumnLatestCell(TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES) != null;

                //check permission on data table
                long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                PTable loadedTable = getTable(env, key, new ImmutableBytesPtr(key), clientTimeStamp, clientTimeStamp,
                        request.getClientVersion());
                if (loadedTable == null) {
                    builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                    builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                    done.run(builder.build());
                    return;
                }
                getCoprocessorHost().preIndexUpdate(Bytes.toString(tenantId),
                        SchemaUtil.getTableName(schemaName, tableName),
                        TableName.valueOf(loadedTable.getPhysicalName().getBytes()),
                        getParentPhysicalTableName(loadedTable),
                        newState);

                PIndexState currentState =
                        PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
                                .getValueOffset()]);
                // Timestamp of INDEX_STATE gets updated with each call
                long actualTimestamp = currentStateKV.getTimestamp();
                long curTimeStampVal = 0;
                long newDisableTimeStamp = 0;
                if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) {
                    curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
                            currentDisableTimeStamp.getValueOffset(), currentDisableTimeStamp.getValueLength());
                    // new DisableTimeStamp is passed in
                    if (disableTimeStampKVIndex >= 0) {
                        Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
                        long expectedTimestamp = newDisableTimeStampCell.getTimestamp();
                        // If the index status has been updated after the upper bound of the scan we use
                        // to partially rebuild the index, then we need to fail the rebuild because an
                        // index write failed before the rebuild was complete.
                        if (actualTimestamp > expectedTimestamp) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            done.run(builder.build());
                            return;
                        }
                        newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
                                newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
                        // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative)
                        // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to
                        // drive the partial index rebuild rather than update it with each attempt to update the index
                        // when a new data table write occurs.
                        // We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the
                        // index in which case the state will be INACTIVE or PENDING_ACTIVE.
                        if (curTimeStampVal != 0 
                                && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE || newState == PIndexState.PENDING_DISABLE)
                                && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
                            // do not reset disable timestamp as we want to keep the min
                            newKVs.remove(disableTimeStampKVIndex);
                            disableTimeStampKVIndex = -1;
                        }
                    }
                }
                // Detect invalid transitions
                if (currentState == PIndexState.BUILDING) {
                    if (newState == PIndexState.USABLE) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                } else if (currentState == PIndexState.DISABLE) {
                    // Can't transition back to INACTIVE if INDEX_DISABLE_TIMESTAMP is 0
                    if (newState != PIndexState.BUILDING && newState != PIndexState.DISABLE &&
                        (newState != PIndexState.INACTIVE || curTimeStampVal == 0)) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                    // Done building, but was disable before that, so that in disabled state
                    if (newState == PIndexState.ACTIVE) {
                        newState = PIndexState.DISABLE;
                    }
                    // Can't transition from DISABLE to PENDING_DISABLE
                    if (newState == PIndexState.PENDING_DISABLE) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        done.run(builder.build());
                        return;
                    }
                }

                if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) {
                    timeStamp = currentStateKV.getTimestamp();
                }
                if ((currentState == PIndexState.ACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.UNUSABLE) {
                    newState = PIndexState.INACTIVE;
                    newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                        INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
                } else if ((currentState == PIndexState.INACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.USABLE) {
                    // Don't allow manual state change to USABLE (i.e. ACTIVE) if non zero INDEX_DISABLE_TIMESTAMP
                    if (curTimeStampVal != 0) {
                        newState = currentState;
                    } else {
                        newState = PIndexState.ACTIVE;
                    }
                    newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                        INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
                }

                PTable returnTable = null;
                if (currentState != newState || disableTimeStampKVIndex != -1) {
                    // make a copy of tableMetadata so we can add to it
                    tableMetadata = new ArrayList<Mutation>(tableMetadata);
                    // Always include the empty column value at latest timestamp so
                    // that clients pull over update.
                    Put emptyValue = new Put(key);
                    emptyValue.addColumn(TABLE_FAMILY_BYTES, 
                            QueryConstants.EMPTY_COLUMN_BYTES, 
                            HConstants.LATEST_TIMESTAMP, 
                            QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
                    tableMetadata.add(emptyValue);
                    byte[] dataTableKey = null;
                    if (dataTableKV != null) {
                        dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, CellUtil.cloneValue(dataTableKV));
                        // insert an empty KV to trigger time stamp update on data table row
                        Put p = new Put(dataTableKey);
                        p.addColumn(TABLE_FAMILY_BYTES,
                                QueryConstants.EMPTY_COLUMN_BYTES,
                                HConstants.LATEST_TIMESTAMP,
                                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
                        tableMetadata.add(p);
                    }
                    boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable;
                    // We're starting a rebuild of the index, so add our rowKeyOrderOptimizable cell
                    // so that the row keys get generated using the new row key format
                    if (setRowKeyOrderOptimizableCell) {
                        UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, key, timeStamp);
                    }
                    mutateRowsWithLocks(region, tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                        HConstants.NO_NONCE);
                    // Invalidate from cache
                    Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                    metaDataCache.invalidate(cacheKey);
                    if(dataTableKey != null) {
                        metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
                    }
                    if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1
                            || currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) {
                        returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock, request.getClientVersion());
                    }
                }
                // Get client timeStamp from mutations, since it may get updated by the
                // mutateRowsWithLocks call
                long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata);
                builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
                builder.setMutationTime(currentTime);
                if (returnTable != null) {
                    builder.setTable(PTableImpl.toProto(returnTable));
                }
                done.run(builder.build());
                return;
            } finally {
                rowLock.release();
            }
        } catch (Throwable t) {
          logger.error("updateIndexState failed", t);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
        }
    }

    private static MetaDataMutationResult checkKeyInRegion(byte[] key, Region region, MutationCode code) {
        return ServerUtil.isKeyInRegion(key, region) ? null : 
            new MetaDataMutationResult(code, EnvironmentEdgeManager.currentTimeMillis(), null);
    }

    private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, Region region) {
        return checkKeyInRegion(key, region, MutationCode.TABLE_NOT_IN_REGION);

    }

    private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, Region region) {
        return checkKeyInRegion(key, region, MutationCode.FUNCTION_NOT_IN_REGION);
    }

    private static MetaDataMutationResult checkSchemaKeyInRegion(byte[] key, Region region) {
        return checkKeyInRegion(key, region, MutationCode.SCHEMA_NOT_IN_REGION);

    }
    
    private static class ViewInfo {
        private byte[] tenantId;
        private byte[] schemaName;
        private byte[] viewName;
        
        public ViewInfo(byte[] tenantId, byte[] schemaName, byte[] viewName) {
            super();
            this.tenantId = tenantId;
            this.schemaName = schemaName;
            this.viewName = viewName;
        }

        public byte[] getTenantId() {
            return tenantId;
        }

        public byte[] getSchemaName() {
            return schemaName;
        }

        public byte[] getViewName() {
            return viewName;
        }
    }

    /**
     * Certain operations, such as DROP TABLE are not allowed if there a table has child views. This class wraps the
     * Results of a scanning the Phoenix Metadata for child views for a specific table and stores an additional flag for
     * whether whether SYSTEM.CATALOG has split across multiple regions.
     */
    private static class TableViewFinder {

        private List<ViewInfo> viewInfoList = Lists.newArrayList();
        private boolean allViewsNotInSingleRegion = false;

        private TableViewFinder() {
        }
        
        private TableViewFinder(List<ViewInfo> viewInfoList) {
            this.viewInfoList = viewInfoList;
        }
        
        public boolean hasViews() {
            return !viewInfoList.isEmpty();
        }

        private void setAllViewsNotInSingleRegion() {
            allViewsNotInSingleRegion = true;
        }

        private List<ViewInfo> getViewInfoList() {
            return viewInfoList;
        }

        /**
         * Returns true is the table has views and they are all in the same HBase region.
         */
        private boolean allViewsInSingleRegion() {
            return viewInfoList.size() > 0 && !allViewsNotInSingleRegion;
        }

        /**
         * Returns true is the table has views and they are all NOT in the same HBase region.
         */
        private boolean allViewsInMultipleRegions() {
            return viewInfoList.size() > 0 && allViewsNotInSingleRegion;
        }
        
        private void addResult(TableViewFinder result) {
            this.viewInfoList.addAll(result.getViewInfoList());
            if (result.allViewsInMultipleRegions()) {
                this.setAllViewsNotInSingleRegion();
            }
        }
    }

    @Override
    public void clearTableFromCache(RpcController controller, ClearTableFromCacheRequest request,
            RpcCallback<ClearTableFromCacheResponse> done) {
        byte[] schemaName = request.getSchemaName().toByteArray();
        byte[] tableName = request.getTableName().toByteArray();
        try {
            byte[] tenantId = request.getTenantId().toByteArray();
            byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
            Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                    GlobalCache.getInstance(this.env).getMetaDataCache();
            metaDataCache.invalidate(cacheKey);
        } catch (Throwable t) {
            logger.error("clearTableFromCache failed", t);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), t));
        }
    }

    @Override
    public void getSchema(RpcController controller, GetSchemaRequest request, RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        Region region = env.getRegion();
        String schemaName = request.getSchemaName();
        byte[] lockKey = SchemaUtil.getSchemaKey(schemaName);
        MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region);
        if (result != null) {
            done.run(MetaDataMutationResult.toProto(result));
            return;
        }
        long clientTimeStamp = request.getClientTimestamp();
        List<RowLock> locks = Lists.newArrayList();
        try {
            getCoprocessorHost().preGetSchema(schemaName);
            acquireLock(region, lockKey, locks);
            // Get as of latest timestamp so we can detect if we have a
            // newer schema that already
            // exists without making an additional query
            ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
            PSchema schema = loadSchema(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp);
            if (schema != null) {
                if (schema.getTimeStamp() < clientTimeStamp) {
                    if (!isSchemaDeleted(schema)) {
                        builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_ALREADY_EXISTS);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        builder.setSchema(PSchema.toProto(schema));
                        done.run(builder.build());
                        return;
                    } else {
                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_SCHEMA_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        builder.setSchema(PSchema.toProto(schema));
                        done.run(builder.build());
                        return;
                    }
                }
            }
        } catch (Exception e) {
            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
            builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_NOT_FOUND);
            builder.setMutationTime(currentTime);
            done.run(builder.build());
            return;
        } finally {
            releaseRowLocks(region,locks);
        }
    }

    @Override
    public void getFunctions(RpcController controller, GetFunctionsRequest request,
            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        byte[] tenantId = request.getTenantId().toByteArray();
        List<String> functionNames = new ArrayList<>(request.getFunctionNamesCount());
        try {
            Region region = env.getRegion();
            List<ByteString> functionNamesList = request.getFunctionNamesList();
            List<Long> functionTimestampsList = request.getFunctionTimestampsList();
            List<byte[]> keys = new ArrayList<byte[]>(request.getFunctionNamesCount());
            List<Pair<byte[], Long>> functions = new ArrayList<Pair<byte[], Long>>(request.getFunctionNamesCount());
            for(int i = 0; i< functionNamesList.size();i++) {
                byte[] functionName = functionNamesList.get(i).toByteArray();
                functionNames.add(Bytes.toString(functionName));
                byte[] key = SchemaUtil.getFunctionKey(tenantId, functionName);
                MetaDataMutationResult result = checkFunctionKeyInRegion(key, region);
                if (result != null) {
                    done.run(MetaDataMutationResult.toProto(result));
                    return;
                }
                functions.add(new Pair<byte[], Long>(functionName,functionTimestampsList.get(i)));
                keys.add(key);
            }

            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
            List<PFunction> functionsAvailable = doGetFunctions(keys, request.getClientTimestamp());
            if (functionsAvailable == null) {
                builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
                builder.setMutationTime(currentTime);
                done.run(builder.build());
                return;
            }
            builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
            builder.setMutationTime(currentTime);

            for (PFunction function : functionsAvailable) {
                builder.addFunction(PFunction.toProto(function));
            }
            done.run(builder.build());
            return;
        } catch (Throwable t) {
            logger.error("getFunctions failed", t);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException(functionNames.toString(), t));
        }
    }

    @Override
    public void createFunction(RpcController controller, CreateFunctionRequest request,
            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        byte[][] rowKeyMetaData = new byte[2][];
        byte[] functionName = null;
        try {
            List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
            boolean temporaryFunction = request.getTemporary();
            MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
            functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
            byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
            Region region = env.getRegion();
            MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
                return;
            }
            List<RowLock> locks = Lists.newArrayList();
            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
            try {
                acquireLock(region, lockKey, locks);
                // Get as of latest timestamp so we can detect if we have a newer function that already
                // exists without making an additional query
                ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey);
                PFunction function =
                        loadFunction(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp, request.getReplace(), functionMetaData);
                if (function != null) {
                    if (function.getTimeStamp() < clientTimeStamp) {
                        // If the function is older than the client time stamp and it's deleted,
                        // continue
                        if (!isFunctionDeleted(function)) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_ALREADY_EXISTS);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            builder.addFunction(PFunction.toProto(function));
                            done.run(builder.build());
                            if(!request.getReplace()) {
                                return;
                            }
                        }
                    } else {
                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_FUNCTION_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        builder.addFunction(PFunction.toProto(function));
                        done.run(builder.build());
                        return;
                    }
                }
                // Don't store function info for temporary functions.
                if(!temporaryFunction) {
                    mutateRowsWithLocks(region, functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);
                }

                // Invalidate the cache - the next getFunction call will add it
                // TODO: consider loading the function that was just created here, patching up the parent function, and updating the cache
                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                metaDataCache.invalidate(cacheKey);
                // Get timeStamp from mutations - the above method sets it if it's unset
                long currentTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
                builder.setReturnCode(MetaDataProtos.MutationCode.FUNCTION_NOT_FOUND);
                builder.setMutationTime(currentTimeStamp);
                done.run(builder.build());
                return;
            } finally {
                releaseRowLocks(region,locks);
            }
        } catch (Throwable t) {
          logger.error("createFunction failed", t);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException(Bytes.toString(functionName), t));
        }         
    }

    @Override
    public void dropFunction(RpcController controller, DropFunctionRequest request,
            RpcCallback<MetaDataResponse> done) {
        byte[][] rowKeyMetaData = new byte[2][];
        byte[] functionName = null;
        try {
            List<Mutation> functionMetaData = ProtobufUtil.getMutations(request);
            MetaDataUtil.getTenantIdAndFunctionName(functionMetaData, rowKeyMetaData);
            byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
            functionName = rowKeyMetaData[PhoenixDatabaseMetaData.FUNTION_NAME_INDEX];
            byte[] lockKey = SchemaUtil.getFunctionKey(tenantIdBytes, functionName);
            Region region = env.getRegion();
            MetaDataMutationResult result = checkFunctionKeyInRegion(lockKey, region);
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
                return;
            }
            List<RowLock> locks = Lists.newArrayList();
            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData);
            try {
                acquireLock(region, lockKey, locks);
                List<byte[]> keys = new ArrayList<byte[]>(1);
                keys.add(lockKey);
                List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();

                result = doDropFunction(clientTimeStamp, keys, functionMetaData, invalidateList);
                if (result.getMutationCode() != MutationCode.FUNCTION_ALREADY_EXISTS) {
                    done.run(MetaDataMutationResult.toProto(result));
                    return;
                }
                mutateRowsWithLocks(region, functionMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE);

                Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
                long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData);
                for(ImmutableBytesPtr ptr: invalidateList) {
                    metaDataCache.invalidate(ptr);
                    metaDataCache.put(ptr, newDeletedFunctionMarker(currentTime));
                    
                }

                done.run(MetaDataMutationResult.toProto(result));
                return;
            } finally {
                releaseRowLocks(region,locks);
            }
        } catch (Throwable t) {
          logger.error("dropFunction failed", t);
            ProtobufUtil.setControllerException(controller,
                ServerUtil.createIOException(Bytes.toString(functionName), t));
        }         
    }

    private MetaDataMutationResult doDropFunction(long clientTimeStamp, List<byte[]> keys, List<Mutation> functionMetaData, List<ImmutableBytesPtr> invalidateList)
            throws IOException, SQLException {
        List<byte[]> keysClone = new ArrayList<byte[]>(keys);
        List<PFunction> functions = doGetFunctions(keysClone, clientTimeStamp);
        // We didn't find a table at the latest timestamp, so either there is no table or
        // there was a table, but it's been deleted. In either case we want to return.
        if (functions == null || functions.isEmpty()) {
            if (buildDeletedFunction(keys.get(0), new FunctionBytesPtr(keys.get(0)), env.getRegion(), clientTimeStamp) != null) {
                return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS, EnvironmentEdgeManager.currentTimeMillis(), null);
            }
            return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
        }

        if (functions != null && !functions.isEmpty()) {
            if (functions.get(0).getTimeStamp() < clientTimeStamp) {
                // If the function is older than the client time stamp and it's deleted,
                // continue
                if (isFunctionDeleted(functions.get(0))) {
                    return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), null);
                }
                invalidateList.add(new FunctionBytesPtr(keys.get(0)));
                Region region = env.getRegion();
                Scan scan = MetaDataUtil.newTableRowsScan(keys.get(0), MIN_TABLE_TIMESTAMP, clientTimeStamp);
                List<Cell> results = Lists.newArrayList();
                try (RegionScanner scanner = region.getScanner(scan);) {
                  scanner.next(results);
                  if (results.isEmpty()) { // Should not be possible
                    return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
                  }
                  do {
                    Cell kv = results.get(0);
                    Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp);
                    functionMetaData.add(delete);
                    results.clear();
                    scanner.next(results);
                  } while (!results.isEmpty());
                }
                return new MetaDataMutationResult(MutationCode.FUNCTION_ALREADY_EXISTS,
                        EnvironmentEdgeManager.currentTimeMillis(), functions, true);
            }
        }
        return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND,
                EnvironmentEdgeManager.currentTimeMillis(), null);
    }

    @Override
    public void createSchema(RpcController controller, CreateSchemaRequest request,
            RpcCallback<MetaDataResponse> done) {
        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
        String schemaName = null;
        try {
            List<Mutation> schemaMutations = ProtobufUtil.getMutations(request);
            schemaName = request.getSchemaName();
            Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(schemaMutations);

            byte[] lockKey = m.getRow();
            Region region = env.getRegion();
            MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region);
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
                return;
            }
            List<RowLock> locks = Lists.newArrayList();
            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations);
            try {
                acquireLock(region, lockKey, locks);
                // Get as of latest timestamp so we can detect if we have a newer schema that already exists without
                // making an additional query
                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey);
                PSchema schema = loadSchema(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp);
                if (schema != null) {
                    if (schema.getTimeStamp() < clientTimeStamp) {
                        if (!isSchemaDeleted(schema)) {
                            builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_ALREADY_EXISTS);
                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                            builder.setSchema(PSchema.toProto(schema));
                            done.run(builder.build());
                            return;
                        }
                    } else {
                        builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_SCHEMA_FOUND);
                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
                        builder.setSchema(PSchema.toProto(schema));
                        done.run(builder.build());
                        return;
                    }
                }
                mutateRowsWithLocks(region, schemaMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                        HConstants.NO_NONCE);

                // Invalidate the cache - the next getSchema call will add it
                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
                        .getMetaDataCache();
                if (cacheKey != null) {
                    metaDataCache.invalidate(cacheKey);
                }

                // Get timeStamp from mutations - the above method sets it if
                // it's unset
                long currentTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations);
                builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_NOT_FOUND);
                builder.setMutationTime(currentTimeStamp);
                done.run(builder.build());
                return;
            } finally {
                releaseRowLocks(region,locks);
            }
        } catch (Throwable t) {
            logger.error("Creating the schema" + schemaName + "failed", t);
            ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(schemaName, t));
        }
    }

    @Override
    public void dropSchema(RpcController controller, DropSchemaRequest request, RpcCallback<MetaDataResponse> done) {
        String schemaName = null;
        try {
            List<Mutation> schemaMetaData = ProtobufUtil.getMutations(request);
            schemaName = request.getSchemaName();
            getCoprocessorHost().preDropSchema(schemaName);
            byte[] lockKey = SchemaUtil.getSchemaKey(schemaName);
            Region region = env.getRegion();
            MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region);
            if (result != null) {
                done.run(MetaDataMutationResult.toProto(result));
                return;
            }
            List<RowLock> locks = Lists.newArrayList();
            long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMetaData);
            try {
                acquireLock(region, lockKey, locks);
                List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(1);
                result = doDropSchema(clientTimeStamp, schemaName, lockKey, schemaMetaData, invalidateList);
                if (result.getMutationCode() != MutationCode.SCHEMA_ALREADY_EXISTS) {
                    done.run(MetaDataMutationResult.toProto(result));
                    return;
                }
                mutateRowsWithLocks(region, schemaMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE,
                        HConstants.NO_NONCE);
                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env)
                        .getMetaDataCache();
                long currentTime = MetaDataUtil.getClientTimeStamp(schemaMetaData);
                for (ImmutableBytesPtr ptr : invalidateList) {
                    metaDataCache.invalidate(ptr);
                    metaDataCache.put(ptr, newDeletedSchemaMarker(currentTime));
                }
                done.run(MetaDataMutationResult.toProto(result));
                return;
            } finally {
                releaseRowLocks(region,locks);
            }
        } catch (Throwable t) {
            logger.error("drop schema failed:", t);
            ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(schemaName, t));
        }
    }

    private MetaDataMutationResult doDropSchema(long clientTimeStamp, String schemaName, byte[] key,
            List<Mutation> schemaMutations, List<ImmutableBytesPtr> invalidateList) throws IOException, SQLException {
        PSchema schema = loadSchema(env, key, new ImmutableBytesPtr(key), clientTimeStamp, clientTimeStamp);
        boolean areTablesExists = false;
        if (schema == null) { return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND,
                EnvironmentEdgeManager.currentTimeMillis(), null); }
        if (schema.getTimeStamp() < clientTimeStamp) {
            Region region = env.getRegion();
            Scan scan = MetaDataUtil.newTableRowsScan(SchemaUtil.getKeyForSchema(null, schemaName), MIN_TABLE_TIMESTAMP,
                    clientTimeStamp);
            List<Cell> results = Lists.newArrayList();
            try (RegionScanner scanner = region.getScanner(scan);) {
                scanner.next(results);
                if (results.isEmpty()) { // Should not be possible
                    return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND,
                            EnvironmentEdgeManager.currentTimeMillis(), null);
                }
                do {
                    Cell kv = results.get(0);
                    if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), key, 0,
                            key.length) != 0) {
                        areTablesExists = true;
                        break;
                    }
                    results.clear();
                    scanner.next(results);
                } while (!results.isEmpty());
            }
            if (areTablesExists) { return new MetaDataMutationResult(MutationCode.TABLES_EXIST_ON_SCHEMA, schema,
                    EnvironmentEdgeManager.currentTimeMillis()); }
            invalidateList.add(new ImmutableBytesPtr(key));
            return new MetaDataMutationResult(MutationCode.SCHEMA_ALREADY_EXISTS, schema,
                    EnvironmentEdgeManager.currentTimeMillis());
        }
        return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(),
                null);

    }
    
    private void mutateRowsWithLocks(final Region region, final List<Mutation> mutations, final Set<byte[]> rowsToLock,
            final long nonceGroup, final long nonce) throws IOException {
        // we need to mutate SYSTEM.CATALOG with HBase/login user if access is enabled.
        if (this.accessCheckEnabled) {
            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
                @Override
                public Void run() throws Exception {
                    final Call rpcContext = RpcUtil.getRpcContext();
                    // Setting RPC context as null so that user can be resetted
                    try {
                        RpcUtil.setRpcContext(null);
                        region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
                    } catch (Throwable e) {
                        throw new IOException(e);
                    } finally {
                        // Setting RPC context back to original context of the RPC
                        RpcUtil.setRpcContext(rpcContext);
                    }
                    return null;
                }
            });
        } else {
            region.mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
        }
    }
    
    private TableName getParentPhysicalTableName(PTable table) {
        return table
                .getType() == PTableType.VIEW
                        ? TableName.valueOf(table.getPhysicalName().getBytes())
                        : table.getType() == PTableType.INDEX
                                ? TableName
                                        .valueOf(SchemaUtil
                                                .getPhysicalHBaseTableName(table.getParentSchemaName(),
                                                        table.getParentTableName(), table.isNamespaceMapped())
                                                .getBytes())
                                : TableName
                                        .valueOf(
                                                SchemaUtil
                                                        .getPhysicalHBaseTableName(table.getSchemaName(),
                                                                table.getTableName(), table.isNamespaceMapped())
                                                        .getBytes());
    }
}
